ForkJoin 学习使用笔记

文章目录
  1. ForkJoin 学习使用笔记
    1. I. 背景
    2. II. ForkJoin 基本知识
      1. 1. 任务分割
      2. 2. 结果合并
    3. III. 使用说明
      1. 1. 累加
      2. 2. 排序
    4. IV. 其他
      1. 参考
      2. 个人博客: Z+|blog
      3. 声明
      4. 扫描关注

ForkJoin 学习使用笔记

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

I. 背景

在日常的业务需求中,经常出现的批量查询,批量写入等接口的提供,一般来说,最简单最low的方式就是写一个for循环来一次执行,但是当业务方对接口的性能要求较高时,就比较尴尬了

通常可以想到的方式是采用并发操作,首先想到可以实现的方式就是利用线程池来做

通常实现方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 1. 创建线程池

ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("biz-exec"),
new ThreadPoolExecutor.CallerRunsPolicy());

// 2. 创建执行任务
List<Future<Object>> futureList = new ArrayList<>();
for(Object arg : list) {
futureList.add(executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
// xxx
}
}));
}

// 3. 结果获取
for(Future f: futureList) {
Object obj = f.get();
}

用上面的这种方式并没有什么问题,我们接下来考虑的是如何使用ForkJoin框架来实现类似的功能

II. ForkJoin 基本知识

Fork: 将大任务拆分成若干个可以并发执行的小任务

Join: 合并所有小任务的执行结果

forkjoin

1. 任务分割

ForkJoinTask : 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务

说明:

  1. fork : 让task异步执行
  2. join : 让task同步执行,可以获取返回值
  3. ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

2. 结果合并

ForkJoinPool 执行 ForkJoinTask

  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

三中提交方式:

  1. execute 异步,无返回结果
  2. submit 异步,有返回结果 (返回Future<T>
  3. invoke 同步,有返回结果 (会阻塞)

III. 使用说明

结合两个场景,给出使用姿势

1. 累加

实现从 start - end 的累加求和

首先是定义一个CountTask 来实现求和

首先是确定任务分割的阀值,当 end-start 的差值大于阀值时,将任务一分为二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CountTask extends RecursiveTask<Integer> {

private int start;
private int end;

private static final int THRED_HOLD = 30;


public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRED_HOLD;
if (canCompute) { // 不需要拆分
for (int i = start; i <= end; i++) {
sum += i;
}

System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end);
} else {
int mid = (end + start) / 2;
CountTask left = new CountTask(start, mid);
CountTask right = new CountTask(mid + 1, end);
left.fork();
right.fork();

sum = left.join() + right.join();
}
return sum;
}
}

调用case

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testFork() throws ExecutionException, InterruptedException {
int start = 0;
int end = 200;

CountTask task = new CountTask(start, end);
ForkJoinPool pool = ForkJoinPool.commonPool();
Future<Integer> ans = pool.submit(task);
int sum = ans.get();
System.out.println(sum);
}

输出结果:

1
2
3
4
5
6
7
8
9
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 200
20100

2. 排序

int 数组进行排序

同样先定义一个SortTask, 主要是为了演示ForkJoin的使用姿势,具体的排序和合并的逻辑比较简陋的实现了一下(这块不是重点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class SortTask extends RecursiveTask<List<Integer>> {

private List<Integer> list;

private final static int THRESHOLD = 5;

public SortTask(List<Integer> list) {
this.list = list;
}

@Override
protected List<Integer> compute() {
if (list.size() < THRESHOLD) {
Collections.sort(list);

System.out.println("thread: " + Thread.currentThread() + " sort: " + list);
return list;
}


int mid = list.size() >> 1;


SortTask l = new SortTask(list.subList(0, mid));
SortTask r = new SortTask(list.subList(mid, list.size()));

l.fork();
r.fork();

List<Integer> left = l.join();
List<Integer> right = r.join();

return merge(left, right);
}

private List<Integer> merge(List<Integer> left, List<Integer> right) {
List<Integer> result = new ArrayList<>(left.size() + right.size());

int rightIndex = 0;
for (int i = 0; i < left.size(); i++) {
if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) {
result.add(left.get(i));
} else {
result.add(right.get(rightIndex++));
i -= 1;
}
}

if (rightIndex < right.size()) {
result.addAll(right.subList(rightIndex, right.size()));
}

return result;
}
}

测试case和上面基本一样,我们改用 invoke 替换上面的 submit

1
2
3
4
5
6
7
8
@Test
public void testMerge() throws ExecutionException, InterruptedException {
List<Integer> list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23);
SortTask sortTask = new SortTask(list);
ForkJoinPool pool = ForkJoinPool.commonPool();
List<Integer> ans = pool.invoke(sortTask);
System.out.println(ans);
}

输出结果

1
2
3
4
5
6
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]
thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]
thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]
thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123]
[6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

IV. 其他

参考

个人博客: Z+|blog

基于hexo + github pages搭建的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因本人能力一般,见识有限,如发现bug或者有更好的建议,随时欢迎批评指正

扫描关注

QrCode

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×