У меня есть 2 слушателя Кафки ежедневно / еженедельно. Daily имеет autoStartup = true, а Weekly имеет autoStartup = false. У меня есть конечная точка, чтобы остановить Daily, который работает и запустить Weekly. После того, как Еженедельник закончил потреблять сообщения, я жду, пока не сработает событие простоя (установленное на 1 минуту), где я останавливаюсь Еженедельно. Теперь я принимаю участие в мероприятии STOP на Weekly, где я начинаю Daily. Теперь проблема в том, что у меня установлен параллелизм на 6, поэтому я получаю 6 событий ожидания и 6 событий остановки. Я справился как ниже. Я хотел бы знать, является ли это хорошей практикой или есть что-нибудь лучше?
Я собираю все события Daily stop в ConcurrentHashMap. Как только он достигнет числа одновременных обращений, это означает, что все 6 потоков ежедневного прослушивателя остановлены и могут запустить еженедельный прослушиватель.
private void processDailyStopEvent(ConsumerStoppedEvent event)
{
LOGGER.info("Processing DAILY Stop events");
KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);
eventMap.get(“dailyStop”).add(source.getListenerId());
LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());
if (eventMap.get(“dailyStop”).size() == container.getConcurrency()) {
LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
eventMap.get(“dailyStop”).clear();
LOGGER.info("Starting WEEKLY Consumer now.");
kafkaService.startWeeklyConsumer();
}
}