Fork/Join框架了解吗?

Fork/Join框架了解吗?
Fork/Join框架是Java 7引入的一种用于并行执行任务的框架。它将大任务划分为多个小任务,并行执行这些小任务,最后将各个小任务的结果合并得到大任务的结果。
要理解Fork/Join框架,需要关注两个关键点:分而治之和工作窃取算法。
分而治之
Fork/Join框架的设计体现了分而治之的思想。它将一个规模为N的问题划分为K个规模较小的子问题,这些子问题相互独立且与原问题具有相同的性质。通过解决子问题,可以得到原问题的解。
工作窃取算法
Fork/Join框架使用了工作窃取算法来提高并行任务的执行效率。该算法基于线程池的设计,其中每个线程都维护自己的任务队列。当一个线程完成了自己的任务后,它会从其他线程的任务队列中窃取任务来执行,以保持线程的高效利用。
工作窃取算法
大任务拆成了若干个小任务,把这些小任务放到不同的队列里,各自创建单独线程来执行队列里的任务。
那么问题来了,有的线程干活块,有的线程干活慢。干完活的线程不能让它空下来,得让它去帮没干完活的线程干活。它去其它线程的队列里窃取一个任务来执行,这就是所谓的工作窃取。
工作窃取发生的时候,它们会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常任务会使用双端队列,被窃取任务线程永远从双端队列的头部拿,而窃取任务的线程永远从双端队列的尾部拿任务执行。
看一个Fork/Join框架应用的例子,计算1~n之间的和:1+2+3+…+n
- 设置一个分割阈值,任务大于阈值就拆分任务
- 任务有结果,所以需要继承RecursiveTask
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 16; // 阈值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork(); // 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join(); // 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 100); // 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}
ForkJoinTask与普通任务的主要区别在于它需要实现compute
方法。在compute
方法中,首先需要判断任务是否足够小,如果足够小,就直接执行任务。如果任务较大,则需要将任务分割成两个子任务。每个子任务在调用fork
方法时会进入compute
方法,检查当前子任务是否需要进一步分割成子任务。如果不需要继续分割,就执行当前子任务并返回结果。使用join
方法可以等待子任务执行完毕并获取其结果。