Я недавно познакомился с LMAX Disruptor и решил попробовать.Благодаря разработчикам, установка прошла быстро и без проблем.Но я думаю, что сталкиваюсь с проблемой, если кто-то может помочь мне с этим.
Проблема: Мне сказали, что когда производитель публикует событие, оно должно блокироваться, пока потребитель не получитшанс получить его, прежде чем обернуться.У меня есть барьер последовательности на стороне потребителя, и я могу подтвердить, что, если производитель не публикует никаких данных, вызов потребителя waitFor будет заблокирован.Но, похоже, что производитель не регулируется каким-либо образом, он просто оборачивает и перезаписывает необработанные данные в кольцевом буфере.
У меня есть производитель как исполняемый объект, работающий в отдельном потоке.
public class Producer implements Runnable {
private final RingBuffer<Event> ringbuffer;
public Producer(RingBuffer<Event> rb) {
ringbuffer = rb;
}
public void run() {
long next = 0L;
while(true) {
try {
next = ringbuffer.next();
Event e = ringbuffer.get(next);
... do stuff...
e.set(... stuff...);
}
finally {
ringbuffer.publish(next);
}
}
}
}
У меня есть потребитель, работающий в главном потоке.
public class Consumer {
private final ExecutorService exec;
private final Disruptor<Event> disruptor;
private final RingBuffer<Event> ringbuffer;
private final SequenceBarrier seqbar;
private long seq = 0L;
public Consumer() {
exec = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
ringbuffer = disruptor.start();
seqbar = ringbuffer.newBarrier();
Producer producer = new Producer(ringbuffer);
exec.submit(producer);
}
public Data getData() {
seqbar.waitFor(seq);
Event e = ringbuffer.get(seq);
seq++;
return e.get();
}
}
Наконец, я запускаю код так:
public class DisruptorTest {
public static void main(String[] args){
Consumer c = new Consumer();
while (true) {
c.getData();
... Do stuff ...
}
}