千家信息网

Disruptor中怎么实现一个高性能队列

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Disruptor 例子impo
千家信息网最后更新 2025年02月01日Disruptor中怎么实现一个高性能队列

Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Disruptor 例子

import java.util.concurrent.ThreadFactoryimport com.lmax.disruptor.dsl.{Disruptor, ProducerType}import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}object DisruptorTest {  val disruptor = {    val factory = new EventFactory[Event] {      override def newInstance(): Event = Event(-1)    }    val threadFactory = new ThreadFactory(){      override def newThread(r: Runnable): Thread = new Thread(r)    }        val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,                         new BlockingWaitStrategy())    disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)        disruptor  }    val translator = new EventTranslatorOneArg[Event, Int]() {    override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {      event.id = arg      println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")    }  }  def main(args: Array[String]): Unit = {    disruptor.start()    (0 until 100).foreach { i =>      disruptor.publishEvent(translator, i)    }    disruptor.shutdown()  }}case class Event(var id: Int) {  override def toString: String = s"event: ${id}"}object TestHandler extends EventHandler[Event] {  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")  }}object ThenHandler extends EventHandler[Event] {  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")  }}

源码阅读

disrutpor 初始化

先看 Disruptor 构造方法

public Disruptor(final EventFactory eventFactory,   final int ringBufferSize,   final ThreadFactory threadFactory,   final ProducerType producerType,  final WaitStrategy waitStrategy) {    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),         new BasicExecutor(threadFactory));}

在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面

public static  RingBuffer create(ProducerType producerType,   EventFactory factory, int bufferSize, WaitStrategy waitStrategy) {    switch (producerType) {        case SINGLE:            return createSingleProducer(factory, bufferSize, waitStrategy);        case MULTI:            return createMultiProducer(factory, bufferSize, waitStrategy);        default:            throw new IllegalStateException(producerType.toString());    }}public static  RingBuffer createSingleProducer(EventFactory factory, int bufferSize,     WaitStrategy waitStrategy) {    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);    return new RingBuffer(factory, sequencer);}RingBufferFields(EventFactory eventFactory, Sequencer sequencer) {    this.sequencer = sequencer;    this.bufferSize = sequencer.getBufferSize();    if (bufferSize < 1) {        throw new IllegalArgumentException("bufferSize must not be less than 1");    }    if (Integer.bitCount(bufferSize) != 1) {        throw new IllegalArgumentException("bufferSize must be a power of 2");    }    this.indexMask = bufferSize - 1;    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];    fill(eventFactory);}private void fill(EventFactory eventFactory) {    for (int i = 0; i < bufferSize; i++) {        entries[BUFFER_PAD + i] = eventFactory.newInstance();    }}

消费事件消息

首先看 disruptor.start(): 消费事件消息入口

private final ConsumerRepository consumerRepository = new ConsumerRepository<>();public RingBuffer start() {    checkOnlyStartedOnce();    for (final ConsumerInfo consumerInfo : consumerRepository) {        consumerInfo.start(executor);    }    return ringBuffer;}

consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链

public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {    return createEventProcessors(new Sequence[0], handlers);}EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences, final EventHandler[] eventHandlers) {    checkNotStarted();    final Sequence[] processorSequences = new Sequence[eventHandlers.length];    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {        final EventHandler eventHandler = eventHandlers[i];        final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);        if (exceptionHandler != null) {            batchEventProcessor.setExceptionHandler(exceptionHandler);        }        consumerRepository.add(batchEventProcessor, eventHandler, barrier);        processorSequences[i] = batchEventProcessor.getSequence();    }    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}

回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题

public Disruptor(        final EventFactory eventFactory,        final int ringBufferSize,        final ThreadFactory threadFactory,        final ProducerType producerType,        final WaitStrategy waitStrategy) {    this(        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),        new BasicExecutor(threadFactory));}private Disruptor(final RingBuffer ringBuffer, final Executor executor) {    this.ringBuffer = ringBuffer;    this.executor = executor;}@Overridepublic void start(final java.util.concurrent.Executor executor){    //EventProcessor extends Runnable    //executor = BasicExecutor     executor.execute(eventprocessor);}public final class BatchEventProcessor implements EventProcessor {  @Override  public void run() {      if (running.compareAndSet(IDLE, RUNNING)) {          sequenceBarrier.clearAlert();          notifyStart();          try {              if (running.get() == RUNNING) {                  processEvents();              }          } finally {              notifyShutdown();              running.set(IDLE);          }      } else {          if (running.get() == RUNNING) {              throw new IllegalStateException("Thread is already running");          } else {              earlyExit();          }      }  }}private void processEvents() {    T event = null;    long nextSequence = sequence.get() + 1L;    while (true) {        try {            final long availableSequence = sequenceBarrier.waitFor(nextSequence);            if (batchStartAware != null) {                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);            }            while (nextSequence <= availableSequence) {                event = dataProvider.get(nextSequence);                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                nextSequence++;            }            sequence.set(availableSequence);        } catch (final TimeoutException e) {            notifyTimeout(sequence.get());        } catch (final AlertException ex) {            if (running.get() != RUNNING) {                break;            }        } catch (final Throwable ex) {            exceptionHandler.handleEventException(ex, nextSequence, event);            sequence.set(nextSequence);            nextSequence++;        }    }}

executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法

问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?

我们看 processEvents 方法执行逻辑

  1. 先获取 BatchEventProcessor.sequence 并 +1

  2. 通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的实现

     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,     SequenceBarrier barrier)    throws AlertException, InterruptedException {    long availableSequence;    if (cursorSequence.get() < sequence) {        lock.lock();        try {            while (cursorSequence.get() < sequence) {                barrier.checkAlert();                processorNotifyCondition.await();            }        }        finally {            lock.unlock();        }    }    while ((availableSequence = dependentSequence.get()) < sequence) {        barrier.checkAlert();    }    return availableSequence;}


    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence 作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化

    final class ProcessingSequenceBarrier implements SequenceBarrier {    private final WaitStrategy waitStrategy;    private final Sequence dependentSequence;    private volatile boolean alerted = false;    private final Sequence cursorSequence;    private final Sequencer sequencer;    ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,        final Sequence cursorSequence, final Sequence[] dependentSequences) {        this.sequencer = sequencer;        this.waitStrategy = waitStrategy;        this.cursorSequence = cursorSequence;        if (0 == dependentSequences.length) {            dependentSequence = cursorSequence;        } else {            dependentSequence = new FixedSequenceGroup(dependentSequences);        }    }}


    在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 仅会被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    public class Disruptor {    public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {        return createEventProcessors(new Sequence[0], handlers);    }    EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences,        final EventHandler[] eventHandlers) {        checkNotStarted();        final Sequence[] processorSequences = new Sequence[eventHandlers.length];        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {            final EventHandler eventHandler = eventHandlers[i];            final BatchEventProcessor batchEventProcessor =                 new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);            if (exceptionHandler != null) {                batchEventProcessor.setExceptionHandler(exceptionHandler);            }            consumerRepository.add(batchEventProcessor, eventHandler, barrier);            processorSequences[i] = batchEventProcessor.getSequence();        }        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);    }}public class EventHandlerGroup {    private final Disruptor disruptor;    private final ConsumerRepository consumerRepository;    private final Sequence[] sequences;    EventHandlerGroup(final Disruptor disruptor, final ConsumerRepository consumerRepository,        final Sequence[] sequences) {        this.disruptor = disruptor;        this.consumerRepository = consumerRepository;        this.sequences = Arrays.copyOf(sequences, sequences.length);    }    public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) {        return disruptor.createEventProcessors(sequences, handlers);    }    public final EventHandlerGroup then(final EventHandler... handlers) {        return handleEventsWith(handlers);    }}


    EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行

生产事件消息

接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,

public  void publishEvent(EventTranslatorOneArg translator, A arg0) {    final long sequence = sequencer.next();    translateAndPublish(translator, sequence, arg0);}private  void translateAndPublish(EventTranslatorOneArg translator, long sequence, A arg0) {    try {        translator.translateTo(get(sequence), sequence, arg0);    } finally {        sequencer.publish(sequence);    }}public E get(long sequence) {    return elementAt(sequence);}

get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);@Overridepublic void publish(long sequence) {    cursor.set(sequence);    waitStrategy.signalAllWhenBlocking();}

总结

流程理清楚了,我们看看 知识点

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0