天天看點

Disruptor筆記(三)-處理模式

常見模式:

1.UniCast a series of items between 1 publisher and 1 EventProcessor:一個publisher 一個eventprocessor

Disruptor筆記(三)-處理模式

P1  - Publisher 1

RB  - RingBuffer

SB  - SequenceBarrier

EP1 -EventProcessor 1

代碼:

private final RingBuffer<ValueEvent> ringBuffer =
        new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                                   new SingleThreadedClaimStrategy(BUFFER_SIZE),
                                   newYieldingWaitStrategy());
    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = newValueAdditionEventHandler();
    private finalBatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    {
        ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
}
           

或 DSL寫法

Disruptor  disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
                                             newSingleThreadedClaimStrategy(4),
                                             newBlockingWaitStrategy());
private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
disruptor. handleEventsWith(handler);//
disruptor.start();
           

2.Pipeline a series of messages

  P1  - Publisher 1

  RB  - RingBuffer

  SB1 - SequenceBarrier 1

  EP1 - EventProcessor 1

  SB2 - SequenceBarrier 2

  EP2 - EventProcessor 2

  SB3 - SequenceBarrier 3

  EP3 -EventProcessor 3

private final RingBuffer<FunctionEvent> ringBuffer =
        new RingBuffer<FunctionEvent>(FunctionEvent.EVENT_FACTORY,
                                      new SingleThreadedClaimStrategy(BUFFER_SIZE),
                                      new YieldingWaitStrategy());

    private final SequenceBarrier stepOneSequenceBarrier = ringBuffer.newBarrier();
    private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
    private final BatchEventProcessor<FunctionEvent> stepOneBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepOneSequenceBarrier, stepOneFunctionHandler);


    private final SequenceBarrier stepTwoSequenceBarrier = ringBuffer.newBarrier(stepOneBatchProcessor.getSequence());
    private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);
    private final BatchEventProcessor<FunctionEvent> stepTwoBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepTwoSequenceBarrier, stepTwoFunctionHandler);


    private final SequenceBarrier stepThreeSequenceBarrier = ringBuffer.newBarrier(stepTwoBatchProcessor.getSequence());
    private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
    private final BatchEventProcessor<FunctionEvent> stepThreeBatchProcessor =
        new BatchEventProcessor<FunctionEvent>(ringBuffer, stepThreeSequenceBarrier, stepThreeFunctionHandler);
    {
        ringBuffer.setGatingSequences(stepThreeBatchProcessor.getSequence());
    }
           
Disruptor  disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
                                             new SingleThreadedClaimStrategy(4),
                                             new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);

private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);

private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);
disruptor.handleEventsWith(stepOneFunctionHandler);
disruptor.after(stepOneFunctionHandler).handleEventsWith(stepTwoFunctionHandler);
disruptor.after(stepTwoFunctionHandler).handleEventsWith(stepThreeFunctionHandler);

或者
Disruptor. handleEventsWith(stepOneFunctionHandler).then(stepTwoFunctionHandler).then(stepThreeFunctionHandler);
disruptor.start();
           
3.Multicast a series of messages to multiple EventProcessors
Disruptor筆記(三)-處理模式
P1  - Publisher 1 RB  - RingBuffer SB  -SequenceBarrier EP1 - EventProcessor 1 EP2 - EventProcessor 2 EP3 -EventProcessor 3
private final RingBuffer<ValueEvent> ringBuffer =
        new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                                   new SingleThreadedClaimStrategy(BUFFER_SIZE),
                                   new YieldingWaitStrategy());

    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

    private final ValueMutationEventHandler[] handlers = new ValueMutationEventHandler[NUM_EVENT_PROCESSORS];
    {
        handlers[0] = new ValueMutationEventHandler(Operation.ADDITION);
        handlers[1] = new ValueMutationEventHandler(Operation.SUBTRACTION);
        handlers[2] = new ValueMutationEventHandler(Operation.AND);
    }

    private final BatchEventProcessor[] batchEventProcessors = new BatchEventProcessor[NUM_EVENT_PROCESSORS];
    {
        batchEventProcessors[0] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[0]);
        batchEventProcessors[1] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[1]);
        batchEventProcessors[2] = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handlers[2]);

        ringBuffer.setGatingSequences(batchEventProcessors[0].getSequence(),
                                      batchEventProcessors[1].getSequence(),
                                      batchEventProcessors[2].getSequence());
    }

           
Disruptor  disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
                                             new SingleThreadedClaimStrategy(4),
                                             new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);

private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);

private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);

disruptor. handleEventsWith(stepOneFunctionHandler ,stepTwoFunctionHandler,stepThreeFunctionHandler);
disruptor.start();
           
4.Replicate a message then fold back the results
Disruptor筆記(三)-處理模式
 P1  - Publisher 1  RB  - RingBuffer  SB1 - SequenceBarrier 1  EP1 - EventProcessor 1  EP2 - EventProcessor 2 SB2 - SequenceBarrier 2 EP3 -EventProcessor 3
private final RingBuffer<FizzBuzzEvent> ringBuffer =
        new RingBuffer<FizzBuzzEvent>(FizzBuzzEvent.EVENT_FACTORY,
                                      new SingleThreadedClaimStrategy(BUFFER_SIZE),
                                      new YieldingWaitStrategy());

    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

    private final FizzBuzzEventHandler fizzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ);
    private final BatchEventProcessor<FizzBuzzEvent> batchProcessorFizz =
        new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrier, fizzHandler);

    private final FizzBuzzEventHandler buzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.BUZZ);
    private final BatchEventProcessor<FizzBuzzEvent> batchProcessorBuzz =
        new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrier, buzzHandler);

    private final SequenceBarrier sequenceBarrierFizzBuzz =
        ringBuffer.newBarrier(batchProcessorFizz.getSequence(), batchProcessorBuzz.getSequence());

    private final FizzBuzzEventHandler fizzBuzzHandler = new FizzBuzzEventHandler(FizzBuzzStep.FIZZ_BUZZ);
    private final BatchEventProcessor<FizzBuzzEvent> batchProcessorFizzBuzz =
            new BatchEventProcessor<FizzBuzzEvent>(ringBuffer, sequenceBarrierFizzBuzz, fizzBuzzHandler);
    {
        ringBuffer.setGatingSequences(batchProcessorFizzBuzz.getSequence());
}
           
Disruptor  disruptor = new Disruptor<TestEvent>(TestEvent.EVENT_FACTORY, executor,
                                             new SingleThreadedClaimStrategy(4),
                                             new BlockingWaitStrategy());
private final FunctionEventHandler stepOneFunctionHandler = new FunctionEventHandler(FunctionStep.ONE);
EventHandlerGroup< TestEvent > group = disruptor. handleEventsWith(stepOneFunctionHandler);

private final FunctionEventHandler stepTwoFunctionHandler = new FunctionEventHandler(FunctionStep.TWO);

private final FunctionEventHandler stepThreeFunctionHandler = new FunctionEventHandler(FunctionStep.THREE);

disruptor. handleEventsWith(stepOneFunctionHandler ,stepTwoFunctionHandler);
disruptor.after(stepOneFunctionHandler, stepTwoFunctionHandler).handleEventsWith(stepThreeFunctionHandler);
disruptor.start();
           
5.Sequence a series of messages from multiple publishers
Disruptor筆記(三)-處理模式
 P1  - Publisher 1  P2  - Publisher 2  P3  - Publisher 3  RB  - RingBuffer  SB  - SequenceBarrier  EP1 - EventProcessor 1
private final RingBuffer<ValueEvent> ringBuffer =
        new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                                   new MultiThreadedLowContentionClaimStrategy(BUFFER_SIZE),
                                   new YieldingWaitStrategy());

    private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    private final ValueAdditionEventHandler handler = new ValueAdditionEventHandler();
    private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    private final ValuePublisher[] valuePublishers = new ValuePublisher[NUM_PUBLISHERS];
    {
        for (int i = 0; i < NUM_PUBLISHERS; i++)
        {
            valuePublishers[i] = new ValuePublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS);
        }

        ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
    }