常見模式:
1.UniCast a series of items between 1 publisher and 1 EventProcessor:一個publisher 一個eventprocessor
P1 - Publisher 1
RB - RingBuffer
SB - SequenceBarrier
EP1 -EventProcessor 1
代碼:
private final RingBuffer<ValueEvent> ringBuffer =
new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
newYieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final ValueAdditionEventHandler handler = newValueAdditionEventHandler();
private finalBatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
{
ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
}
或 DSL寫法
Disruptor disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
newSingleThreadedClaimStrategy(4),
newBlockingWaitStrategy());
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
disruptor. handleEventsWith(handler);//
disruptor.start();
2.Pipeline a series of messages
P1 - Publisher 1
RB - RingBuffer
SB1 - SequenceBarrier 1
EP1 - EventProcessor 1
SB2 - SequenceBarrier 2
EP2 - EventProcessor 2
SB3 - SequenceBarrier 3
EP3 -EventProcessor 3
private final RingBuffer<FunctionEvent> ringBuffer =
new RingBuffer<FunctionEvent>(FunctionEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new YieldingWaitStrategy());
private final SequenceBarrier stepOneSequenceBarrier = ringBuffer.newBarrier();
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
private final BatchEventProcessor<FunctionEvent> stepOneBatchProcessor =
new BatchEventProcessor<FunctionEvent>(ringBuffer, stepOneSequenceBarrier, stepOneFunctionHandler);
private final SequenceBarrier stepTwoSequenceBarrier = ringBuffer.newBarrier(stepOneBatchProcessor.getSequence());
private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
private final BatchEventProcessor<FunctionEvent> stepTwoBatchProcessor =
new BatchEventProcessor<FunctionEvent>(ringBuffer, stepTwoSequenceBarrier, stepTwoFunctionHandler);
private final SequenceBarrier stepThreeSequenceBarrier = ringBuffer.newBarrier(stepTwoBatchProcessor.getSequence());
private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
private final BatchEventProcessor<FunctionEvent> stepThreeBatchProcessor =
new BatchEventProcessor<FunctionEvent>(ringBuffer, stepThreeSequenceBarrier, stepThreeFunctionHandler);
{
ringBuffer.setGatingSequences(stepThreeBatchProcessor.getSequence());
}
Disruptor disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(4),
new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);
private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
disruptor.handleEventsWith(stepOneFunctionHandler);
disruptor.after(stepOneFunctionHandler).handleEventsWith(stepTwoFunctionHandler);
disruptor.after(stepTwoFunctionHandler).handleEventsWith(stepThreeFunctionHandler);
或者
Disruptor. handleEventsWith(stepOneFunctionHandler).then(stepTwoFunctionHandler).then(stepThreeFunctionHandler);
disruptor.start();
3.Multicast a series of messages to multiple EventProcessors
P1 - Publisher 1
RB - RingBuffer
SB -SequenceBarrier
EP1 - EventProcessor 1
EP2 - EventProcessor 2
EP3 -EventProcessor 3
private final RingBuffer<ValueEvent> ringBuffer =
new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new YieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final ValueMutationEventHandler[] handlers = new ValueMutationEventHandler[NUM_EVENT_PROCESSORS];
{
handlers[0] = new ValueMutationEventHandler(Operation.ADDITION);
handlers[1] = new ValueMutationEventHandler(Operation.SUBTRACTION);
handlers[2] = new ValueMutationEventHandler(Operation.AND);
}
private final BatchEventProcessor[] batchEventProcessors = new BatchEventProcessor[NUM_EVENT_PROCESSORS];
{
batchEventProcessors[0] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[0]);
batchEventProcessors[1] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[1]);
batchEventProcessors[2] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[2]);
ringBuffer.setGatingSequences(batchEventProcessors[0].getSequence(),
batchEventProcessors[1].getSequence(),
batchEventProcessors[2].getSequence());
}
Disruptor disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(4),
new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);
private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
disruptor. handleEventsWith(stepOneFunctionHandler ,stepTwoFunctionHandler,stepThreeFunctionHandler);
disruptor.start();
4.Replicate a message then fold back the results
P1 - Publisher 1
RB - RingBuffer
SB1 - SequenceBarrier 1
EP1 - EventProcessor 1
EP2 - EventProcessor 2
SB2 - SequenceBarrier 2
EP3 -EventProcessor 3
private final RingBuffer<FizzBuzzEvent> ringBuffer =
new RingBuffer<FizzBuzzEvent>(FizzBuzzEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new YieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final FizzBuzzEventHandler fizzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ);
private final BatchEventProcessor<FizzBuzzEvent> batchProcessorFizz =
new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrier, fizzHandler);
private final FizzBuzzEventHandler buzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.BUZZ);
private final BatchEventProcessor<FizzBuzzEvent> batchProcessorBuzz =
new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrier, buzzHandler);
private final SequenceBarrier sequenceBarrierFizzBuzz =
ringBuffer.newBarrier(batchProcessorFizz.getSequence(), batchProcessorBuzz.getSequence());
private final FizzBuzzEventHandler fizzBuzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ_BUZZ);
private final BatchEventProcessor<FizzBuzzEvent> batchProcessorFizzBuzz =
new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrierFizzBuzz, fizzBuzzHandler);
{
ringBuffer.setGatingSequences(batchProcessorFizzBuzz.getSequence());
}
Disruptor disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
new SingleThreadedClaimStrategy(4),
new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);
private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
disruptor. handleEventsWith(stepOneFunctionHandler ,stepTwoFunctionHandler);
disruptor.after(stepOneFunctionHandler, stepTwoFunctionHandler).handleEventsWith(stepThreeFunctionHandler);
disruptor.start();
5.Sequence a series of messages from multiple publishers
P1 - Publisher 1
P2 - Publisher 2
P3 - Publisher 3
RB - RingBuffer
SB - SequenceBarrier
EP1 - EventProcessor 1
private final RingBuffer<ValueEvent> ringBuffer =
new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new MultiThreadedLowContentionClaimStrategy(BUFFER_SIZE),
new YieldingWaitStrategy());
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
private final ValuePublisher[] valuePublishers = new ValuePublisher[NUM_PUBLISHERS];
{
for (int i = 0; i < NUM_PUBLISHERS; i++)
{
valuePublishers[i] = new ValuePublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS);
}
ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
}