Disruptor - EventHandlers не вызывается - PullRequest
6 голосов
/ 29 ноября 2011

Я играю с Disruptor framework и обнаруживаю, что мои обработчики событий не вызываются.

Вот мой установочный код:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

В другом месте я публикую события.Я попробовал каждый из следующих двух подходов:

Подход публикации событий A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

В этом сценарии я обнаружил, что первый EventHandler получаетвызывается, но никогда ничего больше.

Подход публикации событий B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

В этом сценарии я обнаружил, что ни один из обработчиков событий вообще не вызывается.

Что я делаю не так?

Обновление

Вот мой EventHandler полностью.Как я должен сигнализировать, что обработка завершена?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}

Ответы [ 3 ]

7 голосов
/ 29 ноября 2011

Каждый обработчик событий должен работать в своем собственном потоке, который не завершится, пока вы не выключите прерыватель.Поскольку вы используете однопоточный исполнитель, будет работать только первый обработчик событий, который будет выполнен.(Класс Disruptor хранит каждый обработчик в хэш-карте, поэтому работа каждого обработчика будет отличаться)

Если вы переключитесь на cachedThreadPool, вы обнаружите, что все это запускается.Вам не нужно будет управлять порядковыми номерами, потому что все это обрабатывается EventProcessor, который класс Disruptor устанавливает и управляет для вас.Просто обрабатывать каждое событие, которое вы получаете, совершенно правильно.

2 голосов
/ 29 ноября 2011

Необходимо убедиться, что ваш searchTermMatchingHandler обновляет свой порядковый номер после обработки события. Дальнейшие события EventHandlers (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) будут проверять порядковый номер searchTermMatchingHandler, чтобы увидеть, какие события они могут получить из кольцевого буфера.

0 голосов
/ 24 марта 2014

У меня была та же проблема, но это было потому, что я создавал Disruptor с помощью Spring (Java config) и создавал Executor в том же методе @Bean, что и Disruptor.

Я исправил проблему, создав экземпляр Executor в отдельном методе @Bean.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...