Disruptor是什么?
轮胎:RingBuffer
RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:
<a href="http://s1.51cto.com/wyfs02/M01/8B/F8/wKioL1heKq_wpmu8AAA9zSBGoCQ743.png" target="_blank"></a>
数组
这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。
序号
RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。
由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。
无锁的机制
在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:
一个生产者 + 一个消费者
生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。
一个生产者 + 多个消费者
多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。
多个生产者 + N个消费者
很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。
Disruptor初体验:简单的生产者和消费者
业务数据对象POJO(Event)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<code>public</code> <code>class</code> <code>Order {</code>
<code> </code><code>//订单ID</code>
<code> </code><code>private</code> <code>long</code> <code>id;</code>
<code> </code><code>//订单信息</code>
<code> </code><code>private</code> <code>String info;</code>
<code> </code><code>//订单价格</code>
<code> </code><code>private</code> <code>double</code> <code>price;</code>
<code> </code><code>public</code> <code>long</code> <code>getId() {</code>
<code> </code><code>return</code> <code>id;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>void</code> <code>setId(</code><code>long</code> <code>id) {</code>
<code> </code><code>this</code><code>.id = id;</code>
<code> </code><code>public</code> <code>String getInfo() {</code>
<code> </code><code>return</code> <code>info;</code>
<code> </code><code>public</code> <code>void</code> <code>setInfo(String info) {</code>
<code> </code><code>this</code><code>.info = info;</code>
<code> </code><code>public</code> <code>double</code> <code>getPrice() {</code>
<code> </code><code>return</code> <code>price;</code>
<code> </code><code>public</code> <code>void</code> <code>setPrice(</code><code>double</code> <code>price) {</code>
<code> </code><code>this</code><code>.price = price;</code>
<code>}</code>
业务数据工厂(Factory)
<code>public</code> <code>class</code> <code>OrderFactory </code><code>implements</code> <code>EventFactory{</code>
<code> </code><code>@Override</code>
<code> </code><code>public</code> <code>Object newInstance() {</code>
<code> </code><code>System.out.println(</code><code>"OrderFactory.newInstance"</code><code>);</code>
<code> </code><code>return</code> <code>new</code> <code>Order();</code>
事件处理器(Handler,即消费者处理逻辑)
<code>public</code> <code>class</code> <code>OrderHandler </code><code>implements</code> <code>EventHandler<Order>{</code>
<code> </code><code>public</code> <code>void</code> <code>onEvent(Order order, </code><code>long</code> <code>l, </code><code>boolean</code> <code>b) </code><code>throws</code> <code>Exception {</code>
<code> </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 消费者处理中:"</code> <code>+ l);</code>
<code> </code><code>order.setInfo(</code><code>"info"</code> <code>+ order.getId());</code>
<code> </code><code>order.setPrice(Math.random());</code>
Main
36
37
38
39
<code>public</code> <code>class</code> <code>Main {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>InterruptedException {</code>
<code> </code><code>//创建订单工厂</code>
<code> </code><code>OrderFactory orderFactory = </code><code>new</code> <code>OrderFactory();</code>
<code> </code><code>//ringbuffer的大小</code>
<code> </code><code>int</code> <code>RINGBUFFER_SIZE = </code><code>1024</code><code>;</code>
<code> </code><code>//创建disruptor</code>
<code> </code><code>Disruptor<Order> disruptor = </code><code>new</code> <code>Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());</code>
<code> </code><code>//设置事件处理器 即消费者</code>
<code> </code><code>disruptor.handleEventsWith(</code><code>new</code> <code>OrderHandler());</code>
<code> </code><code>disruptor.start();</code>
<code> </code><code>RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();</code>
<code> </code><code>//-------------生产数据</code>
<code> </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i < </code><code>3</code> <code>; i++){</code>
<code> </code><code>long</code> <code>sequence = ringBuffer.next();</code>
<code> </code><code>Order order = ringBuffer.get(sequence);</code>
<code> </code><code>order.setId(i);</code>
<code> </code><code>ringBuffer.publish(sequence);</code>
<code> </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 生产者发布一条数据:"</code> <code>+ sequence + </code><code>" 订单ID:"</code> <code>+ i);</code>
<code> </code><code>}</code>
<code> </code><code>Thread.sleep(</code><code>1000</code><code>);</code>
<code> </code><code>disruptor.shutdown();</code>
运行结果:
<a href="http://s2.51cto.com/wyfs02/M00/8B/FE/wKiom1heUkvDVrbbAABB1RkRt-k131.png" target="_blank"></a>
说明:
其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。
另外在构造Disruptor的时候,在3.3.6之前使用的是API:
<a href="http://s4.51cto.com/wyfs02/M02/8B/FB/wKioL1heU5KD4yc1AABMgNq5GkY611.png" target="_blank"></a>
到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。
构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。
<a href="http://s5.51cto.com/wyfs02/M00/8B/FB/wKioL1heVOLj98krAABdMOQG3ug461.png" target="_blank"></a>
单独使用RingBuffer:WorkerPool
如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。
<code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) </code><code>throws</code> <code>InterruptedException {</code>
<code> </code><code>ExecutorService executor = Executors.newFixedThreadPool(</code><code>3</code><code>);</code>
<code> </code><code>RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,</code><code>new</code> <code>OrderFactory(),</code><code>1024</code><code>,</code><code>new</code> <code>YieldingWaitStrategy());</code>
<code> </code><code>WorkerPool<Order> workerPool = </code><code>new</code> <code>WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),</code><code>new</code> <code>IgnoreExceptionHandler(),</code><code>new</code> <code>OrderHandler());</code>
<code> </code><code>workerPool.start(executor);</code>
<code> </code><code>//-------------生产数据</code>
<code> </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i < </code><code>30</code> <code>; i++){</code>
<code> </code><code>long</code> <code>sequence = ringBuffer.next();</code>
<code> </code><code>Order order = ringBuffer.get(sequence);</code>
<code> </code><code>order.setId(i);</code>
<code> </code><code>ringBuffer.publish(sequence);</code>
<code> </code><code>System.out.println(Thread.currentThread().getName() + </code><code>" 生产者发布一条数据:"</code> <code>+ sequence + </code><code>" 订单ID:"</code> <code>+ i);</code>
<code> </code><code>Thread.sleep(</code><code>1000</code><code>);</code>
<code> </code>
<code> </code><code>workerPool.halt();</code>
<code> </code><code>executor.shutdown();</code>
实际上是利用WorkerPool辅助连接消费者。
一个生产者+多个消费者
<a href="http://s5.51cto.com/wyfs02/M01/8B/FC/wKioL1heZFugNx-wAABXx8iDTG8613.png" target="_blank"></a>
<code> </code><code>//创建订单工厂</code>
<code> </code><code>OrderFactory orderFactory = </code><code>new</code> <code>OrderFactory();</code>
<code> </code><code>//ringbuffer的大小</code>
<code> </code><code>int</code> <code>RINGBUFFER_SIZE = </code><code>1024</code><code>;</code>
<code> </code><code>//创建disruptor</code>
<code> </code><code>Disruptor<Order> disruptor = </code><code>new</code> <code>Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());</code>
<code> </code><code>//设置事件处理器 即消费者</code>
<code> </code><code>EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(</code><code>new</code> <code>OrderHandler(),</code><code>new</code> <code>OrderHandler2());</code>
<code> </code><code>eventHandlerGroup.then(</code><code>new</code> <code>OrderHandler3());</code>
<code> </code><code>disruptor.start();</code>
<code> </code><code>RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();</code>
<code> </code><code>for</code><code>(</code><code>int</code> <code>i = </code><code>0</code> <code>; i < </code><code>3</code> <code>; i++){</code>
<code> </code><code>disruptor.shutdown();</code>
<a href="http://s5.51cto.com/wyfs02/M01/8C/00/wKiom1heZJniEbQbAAA1jfoPs8I081.png" target="_blank"></a>
生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。
如果我们想顺序的按照A->B->C呢?
<code> </code><code>disruptor.handleEventsWith(</code><code>new</code> <code>Handler1()).</code>
<code> </code><code>handleEventsWith(</code><code>new</code> <code>Handler2()).</code>
<code> </code><code>handleEventsWith(</code><code>new</code> <code>Handler3());</code>
如果我们想六边形操作呢?
<a href="http://s4.51cto.com/wyfs02/M01/8B/FC/wKioL1heZe3QoLB9AACQcTxdeJ4198.png" target="_blank"></a>
<code> </code><code>Handler1 h1 = </code><code>new</code> <code>Handler1();</code>
<code> </code><code>Handler2 h2 = </code><code>new</code> <code>Handler2();</code>
<code> </code><code>Handler3 h3 = </code><code>new</code> <code>Handler3();</code>
<code> </code><code>Handler4 h4 = </code><code>new</code> <code>Handler4();</code>
<code> </code><code>Handler5 h5 = </code><code>new</code> <code>Handler5();</code>
<code> </code><code>disruptor.handleEventsWith(h1, h2);</code>
<code> </code><code>disruptor.after(h1).handleEventsWith(h4);</code>
<code> </code><code>disruptor.after(h2).handleEventsWith(h5);</code>
<code> </code><code>disruptor.after(h4, h5).handleEventsWith(h3);</code>
到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1885830,如需转载请自行联系原作者