Concurreny Kafka Listener - Как обрабатывать события Stop / Idle, запущенные из 6 потоков - PullRequest
0 голосов
/ 16 апреля 2020

У меня есть 2 слушателя Кафки ежедневно / еженедельно. Daily имеет autoStartup = true, а Weekly имеет autoStartup = false. У меня есть конечная точка, чтобы остановить Daily, который работает и запустить Weekly. После того, как Еженедельник закончил потреблять сообщения, я жду, пока не сработает событие простоя (установленное на 1 минуту), где я останавливаюсь Еженедельно. Теперь я принимаю участие в мероприятии STOP на Weekly, где я начинаю Daily. Теперь проблема в том, что у меня установлен параллелизм на 6, поэтому я получаю 6 событий ожидания и 6 событий остановки. Я справился как ниже. Я хотел бы знать, является ли это хорошей практикой или есть что-нибудь лучше?

Я собираю все события Daily stop в ConcurrentHashMap. Как только он достигнет числа одновременных обращений, это означает, что все 6 потоков ежедневного прослушивателя остановлены и могут запустить еженедельный прослушиватель.

private void processDailyStopEvent(ConsumerStoppedEvent event)
    {
        LOGGER.info("Processing DAILY Stop events");
        KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
        ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);

        eventMap.get(“dailyStop”).add(source.getListenerId());
        LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());

        if (eventMap.get(“dailyStop”).size() == container.getConcurrency()) {
            LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
            eventMap.get(“dailyStop”).clear();

            LOGGER.info("Starting WEEKLY Consumer now.");
            kafkaService.startWeeklyConsumer();
        }
    }


1 Ответ

0 голосов
/ 16 апреля 2020

То, что у вас есть, - разумный подход.

В качестве альтернативы вы можете просто дождаться события остановки параллельного контейнера (где источник равен контейнеру) - оно публикуется после остановки всех дочерних контейнеров. .

...