Disruptor是什么
Disruptor是一个高性能的 Java 并发编程框架(线程间异步通信),是英国外汇交易公司LMAX开发的一个高性能队列,基于Disruptor开发的系统单线程能支撑每秒600万订单。
Disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
目前,包括Apache Storm、Camel、Log4j 2等在内的很多知名项目都应用了Disruptor以获取高性能。
为什么需要Disruptor?
在Java开发工作中,我们经常会用到 JDK 自带的队列ArrayBlockingQueue ,但是在高性能的场景ArrayBlockingQueue 有如下两个问题:
- ArrayBlockingQueue 是通过加锁的方式来保证线程安全,频繁的加锁解锁会严重影响性能;
- 存在伪共享, ArrayBlockingQueue 不能充分利用计算机缓存层次结构的缓存行,严重影响性能。
Disruptor就是针对以上的问题进行改进,从而提升性能,实现高性能队列。
所以要搞懂Disruptor,接下来我们就需要重点抓住锁与伪共享这两点,下面我一一详解。
Disruptor高性能原理解析
提升锁的性能
现实编程过程中,加锁通常会严重地影响性能,线程会因为竞争不到锁而被挂起。
等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。
如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。
如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。
Disruptor论文中讲述了一个实验:
- 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
- 机器环境:2.4G 6核
- 运算: 64位的计数器累加5亿次
通过上面的数据结果,可以看出CAS操作比单线程无锁慢了1个数量级,有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级,可见无锁速度最快。
单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。
在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。
通过上面测试,就很清楚为什么不采用ArrayBlockingQueue,就是因为用了重量级lock锁,在我们加锁过程中我们会把锁挂起,解锁后,又会把线程恢复,这一过程会有一定的开销,并且我们一旦没有获取锁,这个线程就只能一直等待,这个线程什么事也不能做。
所以Disruptor基于CAS(compare and swap)来操作,顾名思义先比较在交换,一般是比较是否是老的值,如果是的进行交换设置,大家熟悉乐观锁的人都知道CAS可以用来实现乐观锁,CAS中没有线程的上下文切换,减少了不必要的开销。
如果对CAS不熟悉的同学,可以点击查看CAS实现原理解析这篇文章。
伪共享问题
CPU缓存
缓存层级越接近于 CPU core,容量越小,速度越快,当 CPU 执行运算的时候,它先去 L1 查找所需的数据,再去 L2,然后是 L3,最后如果这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。
缓存行
缓存行 (Cache Line) 是 CPU Cache 中的最小单位,CPU Cache 由若干缓存行组成,一个缓存行的大小通常是 64 字节(这取决于 CPU),并且它有效地引用主内存中的一块地址。一个 Java 的 long 类型是 8 字节,因此在一个缓存行中可以存 8 个 long 类型的变量。
CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line。
在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。事实上,你可以非常快速的遍历在连续内存块中分配的任意数据结构。
利用CPU缓存-示例
伪共享
如果多个线程的变量共享了同一个 CacheLine,任意一方的修改操作都会使得整个 CacheLine 失效(因为 CacheLine 是 CPU 缓存的最小单位),也就意味着,频繁的多线程操作,CPU 缓存将会彻底失效,降级为 CPU core 和主内存的直接交互。
如何解决伪共享(字节填充)
环形数组结构RingBuffer
除了上面谈到的两个性能问题:一个锁,一个伪共享,还引入了其它什么先进的东西提升性能的?环形数组结构也是其中之一。
在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。
当然其不仅解决了数组快速访问的问题,也解决了不需要再次分配内存的问题,减少了垃圾回收,因为我们0,10,20等都是执行的同一片内存区域,这样就不需要再次分配内存,频繁的被JVM垃圾回收器回收。
实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。 如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。
Disruptor生产者和消费者
生产者写入数据
写入数据的步骤包括:
1.占位;
2.移动游标并填充数据;
需要考虑的问题:
1.如何避免生产者的生产速度过快而造成的新消息覆盖了未被消费的旧消息的问题;
2.如何解决多个生产者抢占生产位的问题;
1.如何避免生产者的生产速度过快而造成的新消息覆盖了未被消费的旧消息的问题;
答:生产者再获取占位之前需要查看当前最慢的消费者位置,如果当前要发布的位置比消费者大,就等待;
2.如何解决多个生产者抢占生产位的问题;
多个生产者通过CAS获取生产位;
消费者读取数据
说明:
1.一个消费者一个线程;
2.每个消费者都有一个游标表示已经消费到哪了(Sequence);
3.消息者会等待(waitFor)新数据,直到生产者通知(signal);
需要考虑的问题:
如何防止读取的时候,读到还未写的元素?
WaitStrategy(等待策略):
BlockingWaitStrategy:默认策略,没有获取到任务的情况下线程会进入等待状态。cpu 消耗少,但是延迟高。
TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常。
BusySpinWaitStrategy:线程一直自旋等待。cpu 占用高,延迟低.
YieldingWaitStrategy:尝试自旋 100 次,然后调用 Thread.yield() 让出 cpu。cpu 占用高,延迟低。
SleepingWaitStrategy:尝试自旋 100 此,然后调用 Thread.yield() 100 次,如果经过这两百次的操作还未获取到任务,就会尝试阶段性挂起自身线程。此种方式是对cpu 占用和延迟的一种平衡,性能不太稳定。
生产者写入数据示例1
生产者写入数据示例2
Disruptor使用源码案例
举一个例子:有一个消费者,有一个生产者,还有一个生产消费队列。
生产者:生产消息,将消息放入队列 消费者:消费队列中的消息
1.队列消息对象(Message)
public class Message { String id; public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String toString() { return "Message{" + "id='" + id + '\'' + '}'; } }
2.生产者
public class MessageProducer { // disrupt 会调用该方法放入队列 private final static EventTranslatorVararg<Message> translator = (message, seq, objs) -> message.setId((String) objs[0]); private final RingBuffer<Message> ringBuffer; public MessageProducer(RingBuffer<Message> ringBuffer) { this.ringBuffer = ringBuffer; } // 供外部调用 public void producerNewMessage(String messageId) { this.ringBuffer.publishEvent(translator, messageId); } }
3.消费者
public class MessageDistributionHandler implements EventHandler<Message> { @Override public void onEvent(Message message, long sequence, boolean endOfBatch) throws Exception { System.out.println("分发消息 " + message); } }
4.Disruptor调用
public class Main { public static void main(String[] args) { // 创建 disruptor Disruptor<Message> disruptor = new Disruptor<>( Message::new, 1024, (ThreadFactory) Thread::new); // 设置消费者 disruptor.handleEventsWith(new MessageDistributionHandler()); // 启动 disruptor disruptor.start(); // 创建生产者 MessageProducer producer = new MessageProducer(disruptor.getRingBuffer()); // 生产消息 for(int i = 0; i<10; i++){ producer.producerNewMessage(String.valueOf(i)); } // 关闭 disruptor disruptor.shutdown();//该方法会阻塞,直至 disruptor 中所有的事件都被处理; } }
mikechen睿哥
mikechen睿哥,十余年BAT架构经验,资深技术专家,就职于阿里、淘宝、百度等一线互联网大厂。
关注「mikechen」公众号,获取更多技术干货!
后台回复【面试】即可获取《史上最全阿里Java面试题总结》,后台回复【架构】,即可获取《阿里架构师进阶专题全部合集》