Потребитель Kafka - приостановить опрос события из указанного раздела c kafka topi c, чтобы использовать его в качестве отложенной очереди - PullRequest
0 голосов
/ 04 февраля 2020

У нас есть сценарий в нашей системе, где kafka topi c XYZ Данные пользователя публикуются другим производящим приложением A (другая система), и мое приложение B потребляет данные из этой топи c.

Требование заключается в том, что приложению B необходимо использовать это событие через 45 минут после (или любого настраиваемого времени), когда оно помещается в kafka topi c XYZ на A (причина этой задержки что другой REST API некоторой системы C должен инициировать на основе этого события Сведения о пользователе для конкретного пользователя, чтобы подтвердить, установлен ли какой-либо флаг для этого пользователя, и этот флаг может быть установлен в любой момент в течение этих 45 минут, хотя он мог быть решен, если бы C не имел возможности опубликовать sh для kafka или уведомить нас любым способом).

Наше приложение B написано весной.

Решением, которое я попробовал, было получение события от Kafka и проверка метки времени первого события в очереди, и если это событие уже составляет 45 минут затем обработайте его или, если оно меньше 45 минут, приостановите опрос контейнера kafka на это время до 45 минут, используя метод MessageListnerContainer pause () . Нечто подобное ниже -

@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
        public void delayedConsumer(@Payload  String message,
                                    Acknowledgment acknowledgment) {

            UserDataEvent userDataEvent = null;
            try {
                 userDataEvent = this.mapper.readValue(message, TopicRequest.class);
            } catch (JsonProcessingException e) {
                logger.error("error while parsing message");
            }
            MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
            if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
 {
                long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
                // give negative ack to put already polled messages back to kafka topic
                acknowledgment.nack(1000);
                // pause container, and later resume it  
                delayedContainer.pause();
                ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.schedule(() -> {
                    delayedContainer.resume();
                }, sleepTimeForPolling, TimeUnit.MILLISECONDS);
                return;
            }
            // if message was already 45 minutes old then process it
            this.service.processMessage(userDataEvent);
            acknowledgment.acknowledge();
        }

Хотя это работает для одного раздела, но я не уверен, что это правильный подход, есть какие-либо комментарии по этому поводу? также я вижу несколько разделов, это вызовет проблемы, как указано выше, вызов метода pause приостановит весь контейнер, и если один из разделов имеет старое сообщение, он не будет использован, если контейнер был приостановлен из-за нового сообщения в каком-либо другом разделе. Могу ли я использовать эту паузу logi c на уровне раздела как-то?

Есть ли лучшее / рекомендуемое решение для достижения этой отложенной обработки после определенного количества настраиваемого времени, которое я могу использовать в этом сценарии вместо того, чтобы делать то, что я делал выше?

1 Ответ

1 голос
/ 04 февраля 2020

Кафка на самом деле не предназначена для такого сценария ios.

Один из способов, с помощью которых я мог бы убедиться, что техника работы будет состоять в том, чтобы установить параллелизм контейнера равным количеству разделов в топи c так что каждый раздел обрабатывается другим потребителем в другом потоке; затем приостановите / возобновите отдельные Consumer<?, ?> s вместо всего контейнера.

Для этого добавьте Consumer<?, ?> в качестве дополнительного параметра; чтобы возобновить работу потребителя, установите idleEventInterval и проверьте таймер в приемнике событий (ListenerContainerIdleEvent). Consumer<?, ?> является свойством события, поэтому вы можете позвонить туда resume().

...