Весенний кафка, повторный звонок не работает - PullRequest
0 голосов
/ 13 февраля 2020

Пытаюсь реализовать весенний кафка, повторный звонок. Ниже мой код

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(false);
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(context -> {
        LOG.info(" In recovery callback method !!");
        ConsumerRecord<?, ?> record = (ConsumerRecord) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
        Map conciseMap = (Map)record.value();
        //publisher.sendMessage((String)conciseMap.get("messagePayload"));
        ((Acknowledgment)context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT)).acknowledge();
        return null;
    });
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setStatefulRetry(true);
    return factory;
}

RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(500L);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3, Collections.singletonMap(RuntimeException.class, true));
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

Но обратного звонка не происходит. Журнал «Метод обратного вызова восстановления» не печатается. В чем может быть проблема?

Используемая версия Spring kafka: 2.2.6.RELEASE

1 Ответ

0 голосов
/ 14 февраля 2020

При использовании повторения с отслеживанием состояния необходимо добавить SeekToCurrentErrorHandler к фабрике контейнеров.

В текущей версии это будет

factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS)));

См. документацию .

Вот полное приложение ...

@SpringBootApplication
public class So60210116Application {

    private static final Logger LOG = LoggerFactory.getLogger(So60210116Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So60210116Application.class, args);
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
            LOG.info(" In recovery callback method !!");
            ConsumerRecord<?, ?> record = (ConsumerRecord) context
                    .getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
            Map conciseMap = (Map) record.value();
            // publisher.sendMessage((String)conciseMap.get("messagePayload"));
            ((Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT))
                    .acknowledge();
            return null;
        });
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setStatefulRetry(true);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS)));
        return factory;
    }

    RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(500L);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
                Collections.singletonMap(RuntimeException.class, true));
        retryTemplate.setRetryPolicy(retryPolicy);
        return retryTemplate;
    }


    @KafkaListener(id = "so60210116", topics = "so60210116")
    public void listen(String in) {
        throw new RuntimeException("test");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60210116").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60210116", "foo");
        };
    }

}

В 2.2.x вместо

factory.setErrorHandler(new SeekToCurrentErrorHandler(Integer.MAX_VALUE));

.

С 2.3 и более поздние версии теперь можно исключить RetryTemplate с соответствующей конфигурацией BackOff и recoverer в обработчике ошибок.

...