Если мои потребители не могут обработать сообщение на данный момент, я хочу, чтобы оно отправлялось sh с задержкой на 5 минут Topi c, а если оно не обрабатывается оттуда, я хочу, чтобы оно было sh до 30 минут с задержкой топи c. Если и здесь произойдет сбой, он хотел бы отправить sh его в очередь недоставленных писем.
5-минутная задержка topi c: Потребитель должен прослушать через 5 минут после первого обработка.
30 минут с задержкой topi c: Потребитель должен слушать через 30 минут после предыдущего сбоя.
Как мне разработать отложенную очередь? Легко подключить sh к Kafka topi c после сбоя, но как мой потребитель / слушатель должен слушать это после 5 или 30 минут задержки?
Я использую SpringKafka, чтобы потребители могли слушать топи c как показано ниже -
@KafkaListener(topics = "${kafka.topic}")
public void receive1(String payload) {
logger.info("Getting message on receiver-1");
submitPayloadToExecutor(payload);
}
У меня есть реализация ниже для моего собственного проекта, может ли кто-нибудь указать на откат и любое новое предложение ??
@KafkaListener(topics = "${kafka.topic}")
public void receive3(String payload) {
logger.info("Getting message on receiver-3");
submitPayloadToExecutor(payload);
}
private void submitPayloadToExecutor(String payload) {
StartupService startupService = StartupServiceSingleton.INSTANCE.getStartupServiceInstance();
ObjectMapper mapper = startupService.getConverter().getObjectMapper();
PublishPostProcessorEntity publishPostProcessorEntity = null;
try {
publishPostProcessorEntity = mapper.readValue(payload, PublishPostProcessorEntity.class);
sleepForDelayedPublishedEntity(publishPostProcessorEntity);
// ... do some work
topicExecutorService.submit(publishPostProcessorEntity);
} catch (Exception e) {
// Work on exception
}
}
private void sleepForDelayedPublishedEntity(PublishPostProcessorEntity publishPostProcessorEntity) {
if (publishPostProcessorEntity instanceof DelayedPublishPostProcessorEntity) {
DelayedPublishPostProcessorEntity delayedPublishPostProcessorEntity = (DelayedPublishPostProcessorEntity) publishPostProcessorEntity;
// Fetch the topicName and sleep based on the configuration
long pushedTimeStamp = delayedPublishPostProcessorEntity.getPushedTimeStamp();
delayedPublishPostProcessorEntity.setComingTopicName(delayedPublishPostProcessorEntity.getNextTopicName());
long currentTimeStamp = System.currentTimeMillis();
if (CMSKafkaConstants.FIVE_MINUTES_DELAYED_TOPIC
.equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {
long timeElapsed = currentTimeStamp - pushedTimeStamp;
if ((Long.parseLong(firstDelay)-timeElapsed) > 0) {
// wait for timeToWait
try {
Thread.sleep(Long.parseLong(firstDelay)-timeElapsed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if (CMSKafkaConstants.THRITY_MINUTES_DELAYED_TOPIC
.equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {
long timeElapsed = currentTimeStamp - pushedTimeStamp;
if ((Long.parseLong(secondDelay)-timeElapsed) > 0) {
try {
Thread.sleep(Long.parseLong(secondDelay)-timeElapsed);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}