消费者 c3 也许是你的业务逻辑。消费者 c1 可能在备份接收到的数据,而消费者 c2 可能在准备数据或者别的东西。
(为什么单词 queue 里必须有这么多 “e” 呢?这是我在画这些图时遇到的最麻烦的词)。
你也许从这里看到了问题的端倪:一条消息从 p1 传输到 c3 要完整的穿过四个队列,每个队列在消息进入队列和取出队列时都会产生消耗成本。
这张图看起来更复杂。不过所有的参与者都只依赖 ring buffer 作为一个单独的联结点,而且所有的交互都是基于 barrier 对象与检查依赖的目标序号来实现的。
中描述过的单生产者模型。有趣的是,生产者并不需要关心所有的消费者。它只关心消费者 c3,如果消费者 c3 处理完了 ring buffer
的某一个节点,那么另外两个消费者肯定也处理完了。因此,只要 c3 的位置向前移动,ring buffer 的后续节点就会空闲出来。
管理消费者的依赖关系需要两个 consumerbarrier 对象。第一个仅仅与 ring buffer 交互,c1 和 c2
消费者向它申请下一个可访问节点。第二个 consumerbarrier 只知道消费者 c1 和
c2,它返回两个消费者访问过的消息序号中较小的那个。
hmmm。我想需要一个例子。
我们从这个故事发生到一半的时候来看:生产者 p1 已经在 ring buffer 里写到序号 22 了,消费者 c1
已经访问和处理完了序号 21 之前的所有数据。消费者 c2 处理到了序号 18。消费者 c3,就是依赖其他消费者的那个,才处理到序号 15。
生产者 p1 不能继续向 ringbuffer 写入数据了,因为序号 15 占据了我们想要写入序号 23 的数据节点 (slot)。
(抱歉,我真的试过用其他颜色来代替红色和绿色,但是别的都更容易混淆。)
第一个 consumerbarrier(cb1)告诉 c1 和 c2 消费者可以去访问序号 22 前面的所有数据,这是 ring
buffer 中的最大序号。第二个 consumerbarrier (cb2) 不但会检查 ringbuffer
的序号,也会检查另外两个消费者的序号并且返回它们之间的最小值。因此,三号消费者被告知可以访问 ring buffer 里序号 18 前面的数据。
注意这些消费者还是直接从 ring buffer 拿数据节点——并不是由 c1 和 c2 消费者把数据节点从 ring buffer
里取出再传递给 c3 消费者的。作为替代的是,由第二个 consumerbarrier 告诉 c3 消费者,在 ringbuffer
里的哪些节点可以安全的处理。
这产生了一个问题——如果任何数据都来自于 ring buffer,那么 c3 消费者如何读到前面两个消费者处理完成的数据呢?如果 c3
消费者关心的只是先前的消费者是否已经完成它们的工作(例如,把数据复制到别的地方),那么这一切都没有问题—— c3
消费者知道工作已完成就放心了。但是,如果 c3 消费者需要访问先前的消费者的处理结果,它又从哪里去获取呢?
秘密在于把处理结果写入 ring buffer 数据节点 (entry) 本身。这样,当 c3 消费者从 ring buffer 取出节点时,它已经填充好了 c3 消费者工作需要的所有信息。这里 真正 重要的地方是节点 (entry) 对象的每一个字段应该只允许一个消费者写入。这可以避免产生并发写入冲突 (write-contention) 减慢了整个处理过程。
有两个字段:fizz 和 buzz。如果消费者是 fizz consumer, 它只写入字段 fizz。如果是 buzz consumer,
它只写入字段 buzz。第三个消费者 fizzbuzz,它只去读这两个字段但是不会做写入,因为读没问题,不会引起争用。
这一切看起来都要比队列实现更复杂。是的,它涉及到更多的内部协调。但是这些细节对于消费者和生产者是隐藏的,它们只和 barrier 对象交互。诀窍在消费者结构里。上文例子中提到的菱形结构可以用下面的方法创建:
<code>01</code>
<code>consumerbarrier consumerbarrier1 =</code>
<code>02</code>
<code> </code><code>ringbuffer.createconsumerbarrier();</code>
<code>03</code>
<code>batchconsumer consumer1 =</code>
<code>04</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier1, handler1);</code>
<code>05</code>
<code>batchconsumer consumer2 =</code>
<code>06</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier1, handler2);</code>
<code>07</code>
<code>consumerbarrier consumerbarrier2 =</code>
<code>08</code>
<code> </code><code>ringbuffer.createconsumerbarrier(consumer1, consumer2);</code>
<code>09</code>
<code>batchconsumer consumer3 =</code>
<code>10</code>
<code> </code><code>new</code> <code>batchconsumer(consumerbarrier2, handler3);</code>
<code>11</code>
<code>producerbarrier producerbarrier =</code>
<code>12</code>
<code> </code><code>ringbuffer.createproducerbarrier(consumer3);</code>
现在你知道了——如何关联 disruptor 与相互依赖(等待)的多个消费者。关键点是:
使用多个 consumerbarrier 来管理消费者之间的依赖(等待)关系。
使用 producerbarrier 监视结构图中最后一个消费者。
只允许一个消费者更新数据节点 (entry) 的每一个独立字段。