ForkJoin最全详解(从原理设计到使用图解)

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

ForkJoin是并发编程中一个比较重要的知识技能,ForkJoin并行框架能方便的利用多核处理器的计算能力实现并发任务的拆分,建议重点掌握@mikechen

以下我会从ForkJoin框架的整体设计、原理、算法、以及使用案例来全面详解ForkJoin。

ForkJoin简介

从JDK1.7开始,Java提供ForkJoin框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

 

工作窃取算法

Fork/Join 框架采用了工作窃取(work-stealing)算法来实现,其算法核心是指某个线程从其他队列里窃取任务来执行。

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。

但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理,干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。

而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行,其工作原理如下图所示:

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

为什么要采用工作窃取算法来实现?

因为当我们需要处理一个大任务时,我们会把这个大任务分割为若干互不依赖的子任务。

为了减少线程间的竞争,我们会把这些子任务分别放到不同的队列里,然后为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。

各个线程执行效率各有差异,为了提高效率,引入“工作窃取算法”,允许“快线程”窃取“慢线程”任务执行

工作窃取算法充分利用多线程进行并行计算,提高了执行效率,同时使用双端队列减少了线程间的冲突竞争,然后不能完全避免冲突,比如某个任务队列中仅有一个任务的时候,两个线程同时竞争。

 

ForkJoin核心设计

除了要了解ForkJoin整个的实现思路, 还需要了解ForkJoin为了实现这个框架定义了哪些角色,并了解这些角色的职责范围是什么的。

ForkJoin框架核心设计类为:任务(ForkJoinTask)和线程池(ForkJoinPool),并发任务执行流程图下图所示:

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

下面分别谈谈两者的核心作用。

ForkJoinPool

既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool,充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。

它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。

它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue,然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

它与其它的ExecutorService区别主要在于它使用“工作窃取“,主要管理ForkJoin框架里面线程的执行。

ForkJoinTask

我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务,而这个类就是一个将在ForkJoinPool中执行的任务的基类。

ForkJoin框架提供了在一个任务里执行fork和join操作的机制和控制任务状态的方法。

通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类(继承于ForkJoinTask)。

它的两个子类分别是:RecursiveAction、RecursiveTask,下面简要谈谈两者的区别。

RecursiveAction:是并发包内现成的ForkJoinTask实现之一,继承自ForkJoinTask,负责处理那些不需要返回结果的任务。

RecursiveTask:也是并发包内现成的ForkJoinTask实现之一,继承自ForkJoinTask,负责处理那些需要返回结果的任务。

这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值,下面是二者的类图关系结构:

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

上面主要谈了ForkJoin并发任务执行的核心设计,下面我们就来看看如何去使用ForkJoin。

 

ForkJoin的使用案例

在使用之前,我们需要记住两个概念:

ForkJoinTask,我们需要自己定义一个ForkJoinTask,用来定义我们需要执行的任务,它可以继承RecursiveAction或者RecursiveTask,从而获取fork和join操作的能力,前者表示不需要返回值,后者则是需要返回值。

ForkJoinPool类似于线程池,连接池的概念,ForkJoinTask都需要交给ForkJoinPool才能执行。

我们通过一个求和的例子来说明ForkJoin的流程,现在我们要对1到8的整数进行求和,代码如下:

package com.mikechen.concurrent.other;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
 
/**
 * ForkJoin计算1+2+3+4+5+6+7+8
 *
 * @author mikechen
 */
public class ForkJoinTaskTest extends RecursiveTask<Integer>  {
 
    public static final int threshold = 2;
    private int start;
    private int end;
 
    public ForkJoinTaskTest(int start, int end) {
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Integer compute() {
        int sum = 0;
 
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
 
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
 
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
 
            ForkJoinTaskTest leftTask = new ForkJoinTaskTest(start, middle);
            ForkJoinTaskTest rightTask = new ForkJoinTaskTest(middle + 1, end);
 
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
 
            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
 
        ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        //生成一个计算任务,计算1+2+3+4+5+6+7+8
        ForkJoinTaskTest task = new ForkJoinTaskTest(1, 8);
 
        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我们在compute中定义任务的拆分粒度和最小任务的执行逻辑,并通过fork和join能力来实现多线程并发,当子任务调用fork的时候,会继续执行子任务的compute,当子任务调用join的时候,会等待其所有子孙任务的执行结果。

这里把1+2+3+4+5+6+7+8这个大任务最终被拆分成了四个子任务,如下图所示:

ForkJoin最全详解(从原理设计到使用图解)-mikechen的互联网架构

分别进行递归fork不断拆分,成为四个子任务:

  1. 1+2
  2. 3+4
  3. 5+6
  4. 7+8

最终再等待子任务执行完毕,合并得到最后的执行结果。

ForkJoin小结

ForkJoin使用分治算法实现的,主要的原理就是将一个大的任务拆分为若干个小任务分发给若干个线程去处理,最后将若干的线程处理好后的结果进行汇总,从而达到提升计算效率的结果。

ForkJoin它是一项可以获得良好的并行性能的简单且高效的设计技术,目的是为了帮助我们更好地利用多处理器并行运算能力来提升应用的性能。

以上我就分别从ForkJoin的设计原理、再到算法、以及使用案例进行完整的详解,希望对你有所帮助!

ForkJoin属于并发编程比较重要的一个知识技能,如果还想全面了解Java并发编程的内容,可以点击查看:Java多线与并发编程从0到1全部合集

以上!


关注「mikechen的互联网架构」公众号,回复【架构】领取我原创的《300期+BAT架构技术系列与1000+大厂面试题答案》

mikechen的互联网架构
评论交流
    说点什么吧…