将系統性能優化到極緻,永遠是程式愛好者所努力的一個方向。在java并發領域,也有很多的實踐與創新,小到樂觀鎖、CAS,大到netty線程模型、纖程Quasar、kilim等。Disruptor是一個輕量的高性能并發架構,以驚人的吞吐量而受到廣泛的關注。Disruptor為提高程式的并發性能,提供了很多新的思路,比如:
- 緩存行填充,消除僞共享;
- RingBuffer無鎖隊列設計;
- 預配置設定緩存對象,使用緩存的循環覆寫取代緩存的新增删除等;
下文将從源碼角度解析Disruptor的實作原理。
1 Disruptor術語
Disruptor有很多自身的概念,使得初學者看代碼會比較費勁。是以在深入Disruptor原理之前,需要先了解一下Disruptor主要的幾個核心類或接口。
- Sequence: 采用緩存行填充的方式對long類型的一層包裝,用以代表事件的序号。通過unsafe的cas方法進而避免了鎖的開銷;
- Sequencer: 生産者與緩存RingBuffer之間的橋梁。單生産者與多生産者分别對應于兩個實作SingleProducerSequencer與MultiProducerSequencer。Sequencer用于向RingBuffer申請空間,使用publish方法通過waitStrategy通知所有在等待可消費事件的SequenceBarrier;
- WaitStrategy: WaitStrategy有多種實作,用以表示當無可消費事件時,消費者的等待政策;
- SequenceBarrier: 消費者與緩存RingBuffer之間的橋梁。消費者并不直接通路RingBuffer,進而能減少RingBuffer上的并發沖突;
- EventProcessor: 事件處理器,是消費者線程池Executor的排程單元,是對事件處理EventHandler與異常處理ExceptionHandler等的一層封裝;
- Event: 消費事件。Event的具體實作由使用者定義;
- RingBuffer: 基于數組的緩存實作,也是建立sequencer與定義WaitStrategy的入口;
- Disruptor: Disruptor的使用入口。持有RingBuffer、消費者線程池Executor、消費者集合ConsumerRepository等引用。
2 Disruptor源碼分析
2.1 Disruptor并發模型
并發領域的一個典型場景是生産者消費者模型,正常方式是使用queue作為生産者線程與消費者線程之間共享資料的方法,對于queue的讀寫避免不了讀寫鎖的競争。Disruptor使用環形緩沖區RingBuffer作為共享資料的媒介。生産者通過Sequencer控制RingBuffer,以及喚醒等待事件的消費者,消費者通過SequenceBarrier監聽RingBuffer的可消費事件。考慮一個場景,一個生産者A與三個消費者B、C、D,同時D的事件處理需要B與C先完成。則該模型結構如下:
在這個結構下,每個消費者擁有各自獨立的事件序号Sequence,消費者之間不存在共享競态。SequenceBarrier1監聽RingBuffer的序号cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序号Sequence,進而将最小的序号傳回給消費者D,由此實作了D依賴B與C的邏輯。
RingBuffer是Disruptor高性能的一個亮點。RingBuffer就是一個大數組,事件以循環覆寫的方式寫入。與正常RingBuffer擁有2個首尾指針的方式不同,Disruptor的RingBuffer隻有一個指針(或稱序号),指向數組下一個可寫入的位置,該序号在Disruptor源碼中就是Sequencer中的cursor,由生産者通過Sequencer控制RingBuffer的寫入。為了避免未消費事件的寫入覆寫,Sequencer需要監聽所有消費者的消息處理進度,也就是gatingSequences。RingBuffer通過這種方式實作了事件緩存的無鎖設計。
下面将通過分析源碼,來了解Disruptor的實作原理。
2.2 Disruptor類
Disruptor類是Disruptor架構的總入口,能用DSL的形式組織消費者之間的關系鍊,并提供擷取事件、釋出事件等方法。它包含以下屬性:
private final RingBuffer<T> ringBuffer;
/**消費者事件處理線程池**/
private final Executor executor;
/**消費者集合**/
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
/**Disruptor是否啟動标示,隻能啟動一次**/
private final AtomicBoolean started = new AtomicBoolean(false);
/**消費者事件異常處理方法**/
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<T>();
執行個體化Disruptor的過程,就是執行個體化RingBuffer與消費線程池Executor的過程。除此之外,Disruptor類最重要的作用是注冊消費者,handleEventsWith方法。該方法有多套實作,而每一個消費者最終都會被包裝成EventProcessor。createEventProcessors是包裝消費者的重要函數。
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<T>[] eventHandlers){
checkNotStarted();
//每個消費者有自己的事件序号Sequence
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//消費者通過SequenceBarrier等待可消費事件
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<T> eventHandler = eventHandlers[i];
//每個消費者都以BatchEventProcessor被排程
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
if (processorSequences.length > 0)
{
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
從程式中可以看出,每個消費者都以BatchEventProcessor的形式被排程,也就是說,消費者的邏輯都在BatchEventProcessor。
2.3 EventProcessor
EventProcessor有兩個有操作邏輯的實作類,BatchEventProcessor與WorkProcessor,處理邏輯很相近,這邊僅分析BatchEventProcessor。
BatchEventProcessor的構造函數使用DataProvider,而不直接使用RingBuffer,可能是Disruptor考慮到留給使用者替換RingBuffer事件存儲的空間,畢竟RingBuffer是記憶體級的。
Disruptor啟動時,會調用每個消費者ConsumerInfo(在消費者集合ConsumerRepository中)的start方法,最終會運作到BatchEventProcessor的run方法。
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
// sequence.get()标示目前已經處理的序号
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
// sequenceBarrier最重要的作用,就是讓消費者等待下一個可用的序号
// 可用序号可能會大于nextSequence,進而消費者可以一次處理多個事件
// 如果該消費者同時也依賴了其他消費者,則會傳回最小的那個
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (nextSequence > availableSequence)
{
Thread.yield();
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
// eventHandler是使用者定義的事件消費邏輯
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
// 跟蹤自己處理的事件
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
消費者的邏輯,就是在while循環中,不斷查詢可消費事件,并由使用者自定義的消費邏輯eventHandler進行處理。查詢可消費事件的邏輯在SequenceBarrier中。
2.4 SequenceBarrier
SequenceBarrier隻有一個實作,ProcessingSequenceBarrier。下面是ProcessingSequenceBarrier的構造函數。
public ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,final Sequence cursorSequence,final Sequence[] dependentSequences)
{
// 生産者的ringBuffer控制器sequencer
this.sequencer = sequencer;
// 消費者等待可消費事件的政策
this.waitStrategy = waitStrategy;
// ringBuffer的cursor
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
// 當依賴其他消費者時,dependentSequence就是其他消費者的序号
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
消費者通過ProcessingSequenceBarrier的waitFor方法等待可消費序号,實際是調用WaitStrategy的waitFor方法。
2.5 WaitStrategy
WaitStrategy有6個實作類,用于代表6種不同的等待政策,比如阻塞政策、忙等政策等。這邊就僅分析一個阻塞政策BlockingWaitStrategy。
@Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if ((availableSequence = cursorSequence.get()) < sequence)
{
lock.lock();
try
{
// 如果ringBuffer的cursor小于需要的序号,也就是生産者沒有新的事件發出,則阻塞消費者線程,直到生産者通過Sequencer的publish方法喚醒消費者。
while ((availableSequence = cursorSequence.get()) < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
}
// 如果生産者新釋出了事件,但是依賴的其他消費者還沒處理完,則等待所依賴的消費者先處理。在本文的例子中,就是等B與C先處理完,D才能處理事件。
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}
return availableSequence;
}
到這裡,消費者的程式邏輯也就基本都清楚了。最後再看一下生産者的程式邏輯,主要是Sequencer。
2.6 Sequencer
Sequencer負責生産者對RingBuffer的控制,包括查詢是否有寫入空間、申請空間、釋出事件并喚醒消費者等。Sequencer有兩個實作SingleProducerSequencer與MultiProducerSequencer,分别對應于單生産者模型與多生産者模型。隻要看懂hasAvailableCapacity(),申請空間也就明白了。下面是SingleProducerSequencer的hasAvailableCapacity實作。
@Override
public boolean hasAvailableCapacity(final int requiredCapacity)
{
long nextValue = pad.nextValue;
// wrapPoint是一個臨界序号,必須比目前最小的未消費序号還小
long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
// 目前的最小未消費序号
long cachedGatingSequence = pad.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
pad.cachedValue = minSequence;
if (wrapPoint > minSequence)
{
return false;
}
}
return true;
}
3 Disruptor執行個體
本執行個體基于3.2.0版本的Disruptor,實作2.1小結描述的并發場景。使用Disruptor的過程非常簡單,隻需要簡單的幾步。
定義使用者事件:
public class MyEvent {
private long value;
public MyEvent(){}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
定義事件工廠,這是執行個體化Disruptor所需要的:
public class MyEventFactory implements EventFactory<MyEvent> {
public MyEvent newInstance() {
return new MyEvent();
}
}
定義消費者B、C、D:
public class MyEventHandlerB implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event B : " + myEvent.getValue());
}
}
public class MyEventHandlerC implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event C : " + myEvent.getValue());
}
}
public class MyEventHandlerD implements EventHandler<MyEvent> {
public void onEvent(MyEvent myEvent, long l, boolean b) throws Exception {
System.out.println("Comsume Event D : " + myEvent.getValue());
}
}
在此基礎上,就可以運作Disruptor了:
public static void main(String[] args){
EventFactory<MyEvent> myEventFactory = new MyEventFactory();
Executor executor = Executors.newCachedThreadPool();
int ringBufferSize = 32;
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(myEventFactory,ringBufferSize,executor, ProducerType.SINGLE,new BlockingWaitStrategy());
EventHandler<MyEvent> b = new MyEventHandlerB();
EventHandler<MyEvent> c = new MyEventHandlerC();
EventHandler<MyEvent> d = new MyEventHandlerD();
SequenceBarrier sequenceBarrier2 = disruptor.handleEventsWith(b,c).asSequenceBarrier();
BatchEventProcessor processord = new BatchEventProcessor(disruptor.getRingBuffer(),sequenceBarrier2,d);
disruptor.handleEventsWith(processord);
// disruptor.after(b,c).handleEventsWith(d); // 此行能代替上兩行的程式邏輯
RingBuffer<MyEvent> ringBuffer = disruptor.start(); // 啟動Disruptor
for(int i=0; i<10; i++) {
long sequence = ringBuffer.next(); // 申請位置
try {
MyEvent myEvent = ringBuffer.get(sequence);
myEvent.setValue(i); // 放置資料
} finally {
ringBuffer.publish(sequence); // 送出,如果不送出完成事件會一直阻塞
}
try{
Thread.sleep(100);
}catch (Exception e){
}
}
disruptor.shutdown();
}
按照程式的邏輯,B與C會率先處理ringBuffer中的事件,且處理順序不分先後。同一事件被B與C處理完成之後,才會被D處理,結果如下:
Comsume Event C : 0
Comsume Event B : 0
Comsume Event D : 0
Comsume Event C : 1
Comsume Event B : 1
Comsume Event D : 1
Comsume Event C : 2
Comsume Event B : 2
Comsume Event D : 2
Comsume Event C : 3
Comsume Event B : 3
Comsume Event D : 3
Comsume Event C : 4
Comsume Event B : 4
Comsume Event D : 4
Comsume Event C : 5
Comsume Event B : 5
Comsume Event D : 5
Comsume Event C : 6
Comsume Event B : 6
Comsume Event D : 6
Comsume Event C : 7
Comsume Event B : 7
Comsume Event D : 7
Comsume Event C : 8
Comsume Event B : 8
Comsume Event D : 8
Comsume Event C : 9
Comsume Event B : 9
Comsume Event D : 9