У нас есть сценарий в нашей системе, где kafka topi c XYZ Данные пользователя публикуются другим производящим приложением A (другая система), и мое приложение B потребляет данные из этой топи c.
Требование заключается в том, что приложению B необходимо использовать это событие через 45 минут после (или любого настраиваемого времени), когда оно помещается в kafka topi c XYZ на A (причина этой задержки что другой REST API некоторой системы C должен инициировать на основе этого события Сведения о пользователе для конкретного пользователя, чтобы подтвердить, установлен ли какой-либо флаг для этого пользователя, и этот флаг может быть установлен в любой момент в течение этих 45 минут, хотя он мог быть решен, если бы C не имел возможности опубликовать sh для kafka или уведомить нас любым способом).
Наше приложение B написано весной.
Решением, которое я попробовал, было получение события от Kafka и проверка метки времени первого события в очереди, и если это событие уже составляет 45 минут затем обработайте его или, если оно меньше 45 минут, приостановите опрос контейнера kafka на это время до 45 минут, используя метод MessageListnerContainer pause () . Нечто подобное ниже -
@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
public void delayedConsumer(@Payload String message,
Acknowledgment acknowledgment) {
UserDataEvent userDataEvent = null;
try {
userDataEvent = this.mapper.readValue(message, TopicRequest.class);
} catch (JsonProcessingException e) {
logger.error("error while parsing message");
}
MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
{
long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
// give negative ack to put already polled messages back to kafka topic
acknowledgment.nack(1000);
// pause container, and later resume it
delayedContainer.pause();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.schedule(() -> {
delayedContainer.resume();
}, sleepTimeForPolling, TimeUnit.MILLISECONDS);
return;
}
// if message was already 45 minutes old then process it
this.service.processMessage(userDataEvent);
acknowledgment.acknowledge();
}
Хотя это работает для одного раздела, но я не уверен, что это правильный подход, есть какие-либо комментарии по этому поводу? также я вижу несколько разделов, это вызовет проблемы, как указано выше, вызов метода pause приостановит весь контейнер, и если один из разделов имеет старое сообщение, он не будет использован, если контейнер был приостановлен из-за нового сообщения в каком-либо другом разделе. Могу ли я использовать эту паузу logi c на уровне раздела как-то?
Есть ли лучшее / рекомендуемое решение для достижения этой отложенной обработки после определенного количества настраиваемого времени, которое я могу использовать в этом сценарии вместо того, чтобы делать то, что я делал выше?