在使用java多线程解决问题的时候为了提高效率,我们常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTask。
下面我就重点讲解Java线程池的的FutureTask的使用与实现原理@mikechen
FutureTask介绍
FutureTask是java5引入的一个类,从名字可以看出来FutureTask既是一个Future,又是一个Task,。
FutureTask实现了Future接口,完成了对Future接口的基本实现,除了实现了Future接口以外,FutureTask还实现了Runnable接口,代码定义如下:
public class FutureTask<V> implements RunnableFuture<V> { ... }
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
因此,FutureTask具备了Runnable和Future的所有特性,可以交给 Executor执行,也可以由调用线程直接执行(FutureTask.run())。
FutureTask应用场景
FutureTask用于在异步操作场景中,FutureTask作为生产者(执行FutureTask的线程)和消费者(获取FutureTask结果的线程)的桥梁。
如果生产者先生产出了数据,那么消费者get时能会直接拿到结果,如果生产者还未产生数据,那么get时会一直阻塞或者超时阻塞,一直到生产者产生数据唤醒阻塞的消费者为止。
FutureTask实现
FutureTask的老版本实现是基于AQS,这样会有一个缺点,当多个线程对同一个FutureTask执行cancel的时候,FutureTask会把中断状态保留起来,让调用方感到奇怪。
所以,FutureTask的新版本实现修改为用state字段+CAS操作进行同步控制,再用简化版的Treiber栈保存等待线程,虽然FutureTask没有使用AQS,但是其实现原理非常类似于AQS。
1. 用一个volatile int state变量作为同步状态
AQS中也有state,但AQS不会使用在state中保存的内容,只是对它执行CAS操作,看是否成功而已。
与AQS不同,FutureTask需要用到state变量中的内容,state所有可能的取值有7个,分别如下:
public class FutureTask<V> implements RunnableFuture<V> { ... private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ... public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } ... }
- NEW = 0; 初始状态,FutureTask刚被创建,正在计算中都是该状态。
- COMPLETING = 1; 中间状态,表示计算已完成正在对结果进行赋值,或正在处理异常
- NORMAL = 2; 终止状态,表示计算已完成,结果已经被赋值。
- EXCEPTIONAL = 3; 终止状态,表示计算过程已经被异常打断。
- CANCELLED = 4; 终止状态,表示计算过程已经被cancel操作终止。
- INTERRUPTING = 5; 中间状态,表示计算过程已开始并且被中断,正在修改状态。
- INTERRUPTED = 6; 终止状态,表示计算过程已开始并且被中断,目前已完全停止。
可能的状态过渡:
1、NEW -> COMPLETING -> NORMAL:正常结束 2、NEW -> COMPLETING -> EXCEPTIONAL:异常结束 3、NEW -> CANCELLED:任务被取消 4、NEW -> INTERRUPTING -> INTERRUPTED:任务出现中断
在FutureTask中,state只会被set(), setException(),cancel()修改为终止状态(NORMAL, EXCEPTIONAL, CANCELED, INTERRUPTED)。
要注意是这个state变量被声明为volatile,不仅保证了它自己的可见性,还保证了FutureTask类其他成员属性的可见性。
2. 用一个Treiber栈保存等待线程
Treiber栈是一个无锁数据结构,FutureTask中的waiters变量指向这个栈的栈顶,名字有点吓人,其实很简单,就是一个无锁的线程安全的栈。
入栈操作只通过一步CAS操作实现,即修改栈顶指针waiters,出栈和在栈的中间执行删除操作通过特定的循环操作实现。
FutureTask使用
FutureTask实现了RunnableFuture接口,而RunnableFuture接口扩展自Future Runnable接口,在创建FutureTask时可以使用Callable接口的实例或者Lambda表达式,也可以使用Runnable的实例,但内部还是会使用适配器模式转换成Callable实例类型。
可以把FutureTask交给Executor执行,也可以通ExecutorService.submit(…)方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法,除此以外,还可以单独使用FutureTask。
当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask。
假设有多个线程执行若干任务,每个任务最多只能被执行一次,当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。
下面是对应的示例代码:
private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>(); private String executionTask(final String taskName)throws ExecutionException, InterruptedException { while (true) { Future<String> future = taskCache.get(taskName); // 1.1,2.1 if (future == null) { Callable<String> task = () -> taskName; FutureTask<String> futureTask = new FutureTask<>(task); future = taskCache.putIfAbsent(taskName, futureTask); // 1.3 if (future == null) { future = futureTask; futureTask.run(); // 1.4执行任务 } } try { return future.get(); // 1.5, } catch (CancellationException e) { taskCache.remove(taskName, future); } } }
FutureTask总结
FutureTask是JDK并发包为Future接口提供的一个实现,代表一个支持取消操作(cancel)的异步计算任务,它实现了Future接口和Runnable接口,所以既是计算任务对象也是结果对象。它可以提交到线程池中去执行,并且结果直接放在自身这个FutureTask中,不是放在另外一个Future中。
mikechen睿哥
mikechen睿哥,十余年BAT架构经验,资深技术专家,就职于阿里、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》