LMAX Disruptor Producer Неправильно выполнено обертывание + перезапись до того, как потребитель завершит чтение - PullRequest
0 голосов
/ 27 мая 2018

Я недавно познакомился с LMAX Disruptor и решил попробовать.Благодаря разработчикам, установка прошла быстро и без проблем.Но я думаю, что сталкиваюсь с проблемой, если кто-то может помочь мне с этим.

Проблема: Мне сказали, что когда производитель публикует событие, оно должно блокироваться, пока потребитель не получитшанс получить его, прежде чем обернуться.У меня есть барьер последовательности на стороне потребителя, и я могу подтвердить, что, если производитель не публикует никаких данных, вызов потребителя waitFor будет заблокирован.Но, похоже, что производитель не регулируется каким-либо образом, он просто оборачивает и перезаписывает необработанные данные в кольцевом буфере.

У меня есть производитель как исполняемый объект, работающий в отдельном потоке.

public class Producer implements Runnable {
    private final RingBuffer<Event> ringbuffer;
    public Producer(RingBuffer<Event> rb) {
        ringbuffer = rb;
    }
    public void run() {
           long next = 0L;
           while(true) {
               try {
                   next = ringbuffer.next();
                   Event e = ringbuffer.get(next);
                   ... do stuff...
                   e.set(... stuff...);
               }
               finally {
                   ringbuffer.publish(next);
               }
           }
    }
}

У меня есть потребитель, работающий в главном потоке.

public class Consumer {
     private final ExecutorService exec;
     private final Disruptor<Event> disruptor;
     private final RingBuffer<Event> ringbuffer;
     private final SequenceBarrier seqbar;
     private long seq = 0L;

     public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
    }

    public Data getData() {
         seqbar.waitFor(seq);
         Event e = ringbuffer.get(seq);
         seq++;
         return e.get();
    }
}

Наконец, я запускаю код так:

public class DisruptorTest {
     public static void main(String[] args){
         Consumer c = new Consumer();
         while (true) {
             c.getData();
             ... Do stuff ...
         }
}

1 Ответ

0 голосов
/ 27 мая 2018

Вам необходимо добавить последовательность стробирования (com.lmax.disruptor.Sequence) в ringBuffer, эта последовательность должна быть обновлена ​​в зависимости от того, в какой точке находится ваш потребитель.

Вы можете реализовать обработку событий с помощью интерфейса EventHandler и используя предоставленныйBatchEventProcessor (com.lmax.disruptor.BatchEventProcessor.BatchEventProcessor), который поставляется со встроенной последовательностью

Вот полностью рабочий пример

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;

public class Main {

   static class Event {

      int id;
   }

   static class Producer implements Runnable {

      private final RingBuffer<Event> ringbuffer;

      public Producer(RingBuffer<Event> rb) {
         ringbuffer = rb;
      }

      @Override
      public void run() {
         long next = 0L;
         int id = 0;
         while (true) {
            try {
               next = ringbuffer.next();
               Event e = ringbuffer.get(next);
               e.id = id++;
            } finally {
               ringbuffer.publish(next);
            }
         }
      }
   }

   static class Consumer {

      private final ExecutorService exec;
      private final Disruptor<Event> disruptor;
      private final RingBuffer<Event> ringbuffer;
      private final SequenceBarrier seqbar;
      private BatchEventProcessor<Event> processor;

      public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(() -> new Event(), 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         processor = new BatchEventProcessor<Main.Event>(
               ringbuffer, seqbar, new Handler());
         ringbuffer.addGatingSequences(processor.getSequence());

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
      }
   }

   static class Handler implements EventHandler<Event> {

      @Override
      public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println("Handling event " + event.id);
      }

   }

   public static void main(String[] args) throws Exception {

      Consumer c = new Consumer();
      while (true) {
         c.processor.run();
      }
   }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...