Как мне реализовать отложенную архитектуру topi c в Kafka - PullRequest
2 голосов
/ 28 мая 2020

Если мои потребители не могут обработать сообщение на данный момент, я хочу, чтобы оно отправлялось sh с задержкой на 5 минут Topi c, а если оно не обрабатывается оттуда, я хочу, чтобы оно было sh до 30 минут с задержкой топи c. Если и здесь произойдет сбой, он хотел бы отправить sh его в очередь недоставленных писем.

5-минутная задержка topi c: Потребитель должен прослушать через 5 минут после первого обработка.

30 минут с задержкой topi c: Потребитель должен слушать через 30 минут после предыдущего сбоя.

Как мне разработать отложенную очередь? Легко подключить sh к Kafka topi c после сбоя, но как мой потребитель / слушатель должен слушать это после 5 или 30 минут задержки?

Я использую SpringKafka, чтобы потребители могли слушать топи c как показано ниже -

@KafkaListener(topics = "${kafka.topic}")
public void receive1(String payload) {
    logger.info("Getting message on receiver-1");
    submitPayloadToExecutor(payload);
}

У меня есть реализация ниже для моего собственного проекта, может ли кто-нибудь указать на откат и любое новое предложение ??

    @KafkaListener(topics = "${kafka.topic}")
    public void receive3(String payload) {

        logger.info("Getting message on receiver-3");
        submitPayloadToExecutor(payload);
    }

    private void submitPayloadToExecutor(String payload) {

        StartupService startupService = StartupServiceSingleton.INSTANCE.getStartupServiceInstance();

        ObjectMapper mapper = startupService.getConverter().getObjectMapper();

        PublishPostProcessorEntity publishPostProcessorEntity = null;
        try {
            publishPostProcessorEntity = mapper.readValue(payload, PublishPostProcessorEntity.class);
            
            sleepForDelayedPublishedEntity(publishPostProcessorEntity);

            // ... do some work

            topicExecutorService.submit(publishPostProcessorEntity);
        } catch (Exception e) {
            // Work on exception
        }
    }

    private void sleepForDelayedPublishedEntity(PublishPostProcessorEntity publishPostProcessorEntity) {

        if (publishPostProcessorEntity instanceof DelayedPublishPostProcessorEntity) {

            DelayedPublishPostProcessorEntity delayedPublishPostProcessorEntity = (DelayedPublishPostProcessorEntity) publishPostProcessorEntity;

            // Fetch the topicName and sleep based on the configuration
            long pushedTimeStamp = delayedPublishPostProcessorEntity.getPushedTimeStamp();
            delayedPublishPostProcessorEntity.setComingTopicName(delayedPublishPostProcessorEntity.getNextTopicName());

            long currentTimeStamp = System.currentTimeMillis();

            if (CMSKafkaConstants.FIVE_MINUTES_DELAYED_TOPIC
                    .equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {

                long timeElapsed = currentTimeStamp - pushedTimeStamp;
                if ((Long.parseLong(firstDelay)-timeElapsed) > 0) {
                    // wait for timeToWait
                    try {
                        Thread.sleep(Long.parseLong(firstDelay)-timeElapsed);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            } else if (CMSKafkaConstants.THRITY_MINUTES_DELAYED_TOPIC
                    .equalsIgnoreCase(delayedPublishPostProcessorEntity.getNextTopicName())) {

                long timeElapsed = currentTimeStamp - pushedTimeStamp;
                if ((Long.parseLong(secondDelay)-timeElapsed) > 0) {
                    try {
                        Thread.sleep(Long.parseLong(secondDelay)-timeElapsed);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }

        }

    }

1 Ответ

0 голосов
/ 28 мая 2020

Вы можете останавливать и запускать потребителей по мере необходимости с помощью KafkaListenerEndpointRegistry (используйте getListenerContainer(id).start()).

...