Как вызвать и обработать эти события Spring Kafka - PullRequest
0 голосов
/ 07 мая 2019

В моем проекте Spring Boot, где у меня есть несколько потребителей Spring Kafka, я добавил несколько слушателей событий, чтобы следить за состоянием этих потребителей.Вот код:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

Кто-нибудь знает, при каких обстоятельствах эти события будут выброшены (и, возможно, как я могу вручную вызвать эти события для целей тестирования)?А также для последних трех событий (ConsumerStoppedEvent, ListenerContainerIdleEvent и NonResponsiveConsumerEvent), когда я получаю одно из них, необходимо ли вмешательство человека для решения проблемы (например, перезапуск серверов для повторного создания потребителей)?Спасибо!

1 Ответ

1 голос
/ 07 мая 2019

Вы можете эмулировать их все, введя фабрику Mock для потребителей в контейнер.

  • ConsumerStoppedEvent испускается при stop() контейнере.
  • ListenerContainerIdleEvent просто означает, что в idleEventInterval не было получено никаких записей, поэтому обычно это не означает, что есть проблема.
  • NonResponsiveConsumerEvent - трудно сказать; со старыми клиентами poll() блокируется, если сервер не работает, поэтому мы не можем отправлять события простоя (или делать что-либо).

Я не знаю, сможете ли вы получить их с более новыми клиентами; но чтобы смоделировать его, вам просто нужно заблокировать метод фиктивного потребителя poll() на достаточно долгое время, чтобы задача монитора могла обнаружить проблему и выдать событие.

...