天天看點

Disruptor深入解讀

将系統性能優化到極緻,永遠是程式愛好者所努力的一個方向。在java并發領域,也有很多的實踐與創新,小到樂觀鎖、CAS,大到netty線程模型、纖程Quasar、kilim等。Disruptor是一個輕量的高性能并發架構,以驚人的吞吐量而受到廣泛的關注。Disruptor為提高程式的并發性能,提供了很多新的思路,比如:

  1. 緩存行填充,消除僞共享;
  2. RingBuffer無鎖隊列設計;
  3. 預配置設定緩存對象,使用緩存的循環覆寫取代緩存的新增删除等;

下文将從源碼角度解析Disruptor的實作原理。

1 Disruptor術語

Disruptor有很多自身的概念,使得初學者看代碼會比較費勁。是以在深入Disruptor原理之前,需要先了解一下Disruptor主要的幾個核心類或接口。

  • Sequence: 采用緩存行填充的方式對long類型的一層包裝,用以代表事件的序号。通過unsafe的cas方法進而避免了鎖的開銷;
  • Sequencer: 生産者與緩存RingBuffer之間的橋梁。單生産者與多生産者分别對應于兩個實作SingleProducerSequencer與MultiProducerSequencer。Sequencer用于向RingBuffer申請空間,使用publish方法通過waitStrategy通知所有在等待可消費事件的SequenceBarrier;
  • WaitStrategy: WaitStrategy有多種實作,用以表示當無可消費事件時,消費者的等待政策;
  • SequenceBarrier: 消費者與緩存RingBuffer之間的橋梁。消費者并不直接通路RingBuffer,進而能減少RingBuffer上的并發沖突;
  • EventProcessor: 事件處理器,是消費者線程池Executor的排程單元,是對事件處理EventHandler與異常處理ExceptionHandler等的一層封裝;
  • Event: 消費事件。Event的具體實作由使用者定義;
  • RingBuffer: 基于數組的緩存實作,也是建立sequencer與定義WaitStrategy的入口;
  • Disruptor: Disruptor的使用入口。持有RingBuffer、消費者線程池Executor、消費者集合ConsumerRepository等引用。

2 Disruptor源碼分析

2.1 Disruptor并發模型

并發領域的一個典型場景是生産者消費者模型,正常方式是使用queue作為生産者線程與消費者線程之間共享資料的方法,對于queue的讀寫避免不了讀寫鎖的競争。Disruptor使用環形緩沖區RingBuffer作為共享資料的媒介。生産者通過Sequencer控制RingBuffer,以及喚醒等待事件的消費者,消費者通過SequenceBarrier監聽RingBuffer的可消費事件。考慮一個場景,一個生産者A與三個消費者B、C、D,同時D的事件處理需要B與C先完成。則該模型結構如下:

在這個結構下,每個消費者擁有各自獨立的事件序号Sequence,消費者之間不存在共享競态。SequenceBarrier1監聽RingBuffer的序号cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序号Sequence,進而将最小的序号傳回給消費者D,由此實作了D依賴B與C的邏輯。

RingBuffer是Disruptor高性能的一個亮點。RingBuffer就是一個大數組,事件以循環覆寫的方式寫入。與正常RingBuffer擁有2個首尾指針的方式不同,Disruptor的RingBuffer隻有一個指針(或稱序号),指向數組下一個可寫入的位置,該序号在Disruptor源碼中就是Sequencer中的cursor,由生産者通過Sequencer控制RingBuffer的寫入。為了避免未消費事件的寫入覆寫,Sequencer需要監聽所有消費者的消息處理進度,也就是gatingSequences。RingBuffer通過這種方式實作了事件緩存的無鎖設計。

下面将通過分析源碼,來了解Disruptor的實作原理。

2.2 Disruptor類

Disruptor類是Disruptor架構的總入口,能用DSL的形式組織消費者之間的關系鍊,并提供擷取事件、釋出事件等方法。它包含以下屬性:

private final RingBuffer<T> ringBuffer;
/**消費者事件處理線程池**/
private final Executor executor;
/**消費者集合**/
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
/**Disruptor是否啟動标示,隻能啟動一次**/
private final AtomicBoolean started = new AtomicBoolean(false);
/**消費者事件異常處理方法**/
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();      

執行個體化Disruptor的過程,就是執行個體化RingBuffer與消費線程池Executor的過程。除此之外,Disruptor類最重要的作用是注冊消費者,handleEventsWith方法。該方法有多套實作,而每一個消費者最終都會被包裝成EventProcessor。createEventProcessors是包裝消費者的重要函數。

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,                                           final EventHandler<T>[] eventHandlers){
    checkNotStarted();
    //每個消費者有自己的事件序号Sequence
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];   
    //消費者通過SequenceBarrier等待可消費事件
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<T> eventHandler = eventHandlers[i];
        //每個消費者都以BatchEventProcessor被排程
        final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);  
        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }
    
    if (processorSequences.length > 0)
    {
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
    
    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}      

從程式中可以看出,每個消費者都以BatchEventProcessor的形式被排程,也就是說,消費者的邏輯都在BatchEventProcessor。

2.3 EventProcessor

EventProcessor有兩個有操作邏輯的實作類,BatchEventProcessor與WorkProcessor,處理邏輯很相近,這邊僅分析BatchEventProcessor。

BatchEventProcessor的構造函數使用DataProvider,而不直接使用RingBuffer,可能是Disruptor考慮到留給使用者替換RingBuffer事件存儲的空間,畢竟RingBuffer是記憶體級的。

Disruptor啟動時,會調用每個消費者ConsumerInfo(在消費者集合ConsumerRepository中)的start方法,最終會運作到BatchEventProcessor的run方法。

@Override
public void run()
{
    if (!running.compareAndSet(false, true))
    {
        throw new IllegalStateException("Thread is already running");
    }
    sequenceBarrier.clearAlert();
    
    notifyStart();
    
    T event = null;
    // sequence.get()标示目前已經處理的序号
    long nextSequence = sequence.get() + 1L;
    try
    {
        while (true)
        {
            try
            {
                // sequenceBarrier最重要的作用,就是讓消費者等待下一個可用的序号
                // 可用序号可能會大于nextSequence,進而消費者可以一次處理多個事件
                // 如果該消費者同時也依賴了其他消費者,則會傳回最小的那個
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (nextSequence > availableSequence)
                {
                    Thread.yield();
                }
                
                while (nextSequence <= availableSequence)
                {
                    event = dataProvider.get(nextSequence);
                    // eventHandler是使用者定義的事件消費邏輯
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
                
                // 跟蹤自己處理的事件
                sequence.set(availableSequence);
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }
    finally
    {
        notifyShutdown();
        running.set(false);
    }
}      

消費者的邏輯,就是在while循環中,不斷查詢可消費事件,并由使用者自定義的消費邏輯eventHandler進行處理。查詢可消費事件的邏輯在SequenceBarrier中。

2.4 SequenceBarrier

SequenceBarrier隻有一個實作,ProcessingSequenceBarrier。下面是ProcessingSequenceBarrier的構造函數。

public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences)
{
    // 生産者的ringBuffer控制器sequencer
    this.sequencer = sequencer;
    // 消費者等待可消費事件的政策
    this.waitStrategy = waitStrategy;
    // ringBuffer的cursor
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length)
    {
        dependentSequence = cursorSequence;
    }
    else
    {
    // 當依賴其他消費者時,dependentSequence就是其他消費者的序号
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}      

消費者通過ProcessingSequenceBarrier的waitFor方法等待可消費序号,實際是調用WaitStrategy的waitFor方法。

2.5 WaitStrategy

WaitStrategy有6個實作類,用于代表6種不同的等待政策,比如阻塞政策、忙等政策等。這邊就僅分析一個阻塞政策BlockingWaitStrategy。

@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    if ((availableSequence = cursorSequence.get()) < sequence)
    {
        lock.lock();
        try
        {
            // 如果ringBuffer的cursor小于需要的序号,也就是生産者沒有新的事件發出,則阻塞消費者線程,直到生産者通過Sequencer的publish方法喚醒消費者。
            while ((availableSequence = cursorSequence.get()) < sequence)
            {
                barrier.checkAlert();
                processorNotifyCondition.await();
            }
        }
        finally
        {
            lock.unlock();
        }
    }
    
    // 如果生産者新釋出了事件,但是依賴的其他消費者還沒處理完,則等待所依賴的消費者先處理。在本文的例子中,就是等B與C先處理完,D才能處理事件。
    while ((availableSequence = dependentSequence.get()) < sequence)
    {
        barrier.checkAlert();
    }
    
    return availableSequence;
}      

到這裡,消費者的程式邏輯也就基本都清楚了。最後再看一下生産者的程式邏輯,主要是Sequencer。

2.6 Sequencer

Sequencer負責生産者對RingBuffer的控制,包括查詢是否有寫入空間、申請空間、釋出事件并喚醒消費者等。Sequencer有兩個實作SingleProducerSequencer與MultiProducerSequencer,分别對應于單生産者模型與多生産者模型。隻要看懂hasAvailableCapacity(),申請空間也就明白了。下面是SingleProducerSequencer的hasAvailableCapacity實作。

@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
    long nextValue = pad.nextValue;
    // wrapPoint是一個臨界序号,必須比目前最小的未消費序号還小
    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
    // 目前的最小未消費序号
    long cachedGatingSequence = pad.cachedValue;
    
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
    {
        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
        pad.cachedValue = minSequence;
        
        if (wrapPoint > minSequence)
        {
            return false;
        }
    }
    return true;
}      

3 Disruptor執行個體

本執行個體基于3.2.0版本的Disruptor,實作2.1小結描述的并發場景。使用Disruptor的過程非常簡單,隻需要簡單的幾步。

定義使用者事件:

public class MyEvent {
    private long value;
    
    public MyEvent(){}
    
    public long getValue() {
        return value;
    }
    
    public void setValue(long value) {
        this.value = value;
    }
}      

定義事件工廠,這是執行個體化Disruptor所需要的:

public class MyEventFactory implements EventFactory<MyEvent> {
    public MyEvent newInstance() {
        return new MyEvent();
    }
}      

定義消費者B、C、D:

public class MyEventHandlerB implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event B : " + myEvent.getValue());
    }
}

public class MyEventHandlerC implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event C : " + myEvent.getValue());
    }
}

public class MyEventHandlerD implements EventHandler<MyEvent> {
    public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
        System.out.println("Comsume Event D : " + myEvent.getValue());
    }
}      

在此基礎上,就可以運作Disruptor了:

public static void main(String[] args){
    EventFactory<MyEvent> myEventFactory = new MyEventFactory();
    Executor executor = Executors.newCachedThreadPool();
    int ringBufferSize = 32;
    
    Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
    EventHandler<MyEvent> b = new MyEventHandlerB();
    EventHandler<MyEvent> c = new MyEventHandlerC();
    EventHandler<MyEvent> d = new MyEventHandlerD();
    
    SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
    BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
    disruptor.handleEventsWith(processord);
//  disruptor.after(b,c).handleEventsWith(d);              // 此行能代替上兩行的程式邏輯
    RingBuffer<MyEvent> ringBuffer = disruptor.start();    // 啟動Disruptor
    for(int i=0; i<10; i++) {
        long sequence = ringBuffer.next();                 // 申請位置
        try {
            MyEvent myEvent = ringBuffer.get(sequence);
            myEvent.setValue(i);                           // 放置資料
        } finally {
            ringBuffer.publish(sequence);                  // 送出,如果不送出完成事件會一直阻塞
        }
        try{
            Thread.sleep(100);
        }catch (Exception e){
        }
    }
    disruptor.shutdown();
}      

按照程式的邏輯,B與C會率先處理ringBuffer中的事件,且處理順序不分先後。同一事件被B與C處理完成之後,才會被D處理,結果如下:

Comsume Event C : 0
Comsume Event B : 0
Comsume Event D : 0
Comsume Event C : 1
Comsume Event B : 1
Comsume Event D : 1
Comsume Event C : 2
Comsume Event B : 2
Comsume Event D : 2
Comsume Event C : 3
Comsume Event B : 3
Comsume Event D : 3
Comsume Event C : 4
Comsume Event B : 4
Comsume Event D : 4
Comsume Event C : 5
Comsume Event B : 5
Comsume Event D : 5
Comsume Event C : 6
Comsume Event B : 6
Comsume Event D : 6
Comsume Event C : 7
Comsume Event B : 7
Comsume Event D : 7
Comsume Event C : 8
Comsume Event B : 8
Comsume Event D : 8
Comsume Event C : 9
Comsume Event B : 9
Comsume Event D : 9