天天看点

Storm-源码分析- Disruptor在storm中的使用

disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名

老版本,

Storm-源码分析- Disruptor在storm中的使用

新版本,

Storm-源码分析- Disruptor在storm中的使用

从左到右的变化如下,

1. producer –> publisher 

2. producerbarrier被integrate到ringbuffer里面, 叫做publishport, 提供publish接口 

3. entry –> event 

4, cursor封装成sequence, 其实sequence就是将cursor+pading封装一下 

5. consumer –> eventprocesser 

6. consumerbarrier 变为dependencybarrier, 或sequencebarrier

并且对于publisher和eventprocesser, 存在claimstrategy和waitstrategy 

对于publisher的claimstrategy, 由于publisher需要先claim到sequencer才能publish: singlethreadedclaimstrategy, multithreadedclaimstrategy, 应该是对于singlethread不需要使用cas更为高效 

对于eventprocesser的waitstrategy, 当取不到数据的时候采用什么样的策略进行等待: blockingwaitstrategy, busyspinwaitstrategy, sleepingwaitstrategy, yieldingwaitstrategy 

blocking就是同步加锁, busyspin就是忙等耗cpu, 都比较低效 

yielding就是调用thread.yeild(), 把线程的从可执行状态调整成就绪装, 意思我先息下, 你们忙你们先来, 就是把cpu让给其他的线程, 但是yeild并不保证过多久线程被执行, 如果没有其他线程, 可能会被立即执行 

而sleep, 会强制线程休眠指定时间, 然后再重新调度

首先声明一组变量, 部分会在构造函数中被初始化 

最重要的结构就是ringbuffer, 这是个模板类, 这里从objecteventfactory()的实现也可以看出来, 初始化的时候在ringbuffer的每个entry上都创建一个mutableobject对象

mutableobject的实现很简单, 这是封装了object o, 为什么要做这层封装? 

为了避免java gc, 对于ringbuffer一旦初始化好, 上面的所有的mutableobject都不会被释放, 你只是去对object o, set不同的值

publish过程, 可见当前producerbarrier已经被集成到ringbuffer里面, 所以直接调用_buffer的接口 

首先调用next, claim序号 

取出序号上的mutableobject, 并将输入obj set 

最后, publish当前序号, 表示consumer可以读取 

当consumer没有start时, 会将obj cache在_cache中, 而不会放到ringbuffer中 (我没有想明白why? 为何要使用低效的链表queue来cache, 而不直接放到ringbuffer里面)

consume的过程, 这里实现的时batch consume, 即给定cursor, 会一直consume到该cursor为止

_consumer代表当前已经被consume的序号, 所以从_consumer.get() + 1开始读 

取出mutableobject中的o, 并将mutableobject 清空 

根据o的情况, 3种情况, 

    1. 如果是flush_cache对象, 将cache中的event读出调用handler.onevent 

    2. 如果是interrupt对象, 触发interruptedexception 

    3. 正常情况, 直接调用handler.onevent处理该o, curr == cursor判断表示batch是否结束, 当读到cursor的时候结束

最终将_consumer置为cursor, 表示已经读到cursor位置

创建disruptorqueue, 选用multithreadedclaimstrategy和blockingwaitstrategy

并封装一系列java接口 

最重要的工作是, 启动consume-loop 

这里ret是closeover了一个间隔为0的不停执行(consume-batch-when-available queue handler) 的线程, 而consumebatchwhenavailable的实现就是不停的sleep并调用consumebatchtocursor

并且通过consumer-started!通知其他线程consumer已经start

看看async-loop实现什么功能?  返回reify实现的record, 其中closeover了thread  这个thread主要就是死循环的执行传入的afn, 并且以afn的返回值作为执行间隔 主要功能, 异步的loop, 开启新的线程来执行loop, 而不是在当前主线程, 并且提供了sleep设置
Storm-源码分析- Disruptor在storm中的使用

 本文章摘自博客园,原文发布日期:2013-07-10