Я играю с Disruptor framework и обнаруживаю, что мои обработчики событий не вызываются.
Вот мой установочный код:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
В другом месте я публикую события.Я попробовал каждый из следующих двух подходов:
Подход публикации событий A:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
В этом сценарии я обнаружил, что первый EventHandler
получаетвызывается, но никогда ничего больше.
Подход публикации событий B:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
В этом сценарии я обнаружил, что ни один из обработчиков событий вообще не вызывается.
Что я делаю не так?
Обновление
Вот мой EventHandler полностью.Как я должен сигнализировать, что обработка завершена?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}