Spring KafkaListener не выбирает самое раннее незафиксированное смещение после возобновления паузы - PullRequest
0 голосов
/ 17 марта 2019

UseCase - Приостановка приостановки Spring KafkaListener на несколько секунд в случае Retryable Exceptions, поступающих от внешней службы, и требуется возобновить работу с самого раннего незафиксированного смещения.

Проблема, которую я имею - Ниже приведена реализация.

1) Без использования поиска - после возобновления весны kafkalistener выбирает последнее сообщение, поступающее в раздел раздела.Это побеждает цель (сообщения между последним принятым смещением и последним смещением отсутствуют)

2) При использовании поиска - я не знаю, как получить дескриптор kafkaconsumer

ИСТОЧНИК КОД

Метод Лизенера в приемнике

 @KafkaListener(topics = "${kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
    public void onReceiving(@Payload ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {

            try {
                Event event = translate(consumerRecord);
                someService.processEvent(event, consumerRecord);
                commitOffset(acknowledgment)
            } catch(ConsumerException e) {
                //DO NOT commit offset
            }
        }

    private void commitOffset(Acknowledgment acknowledgment) {
        acknowledgment.acknowledge();
    }
Service
public void processEvent(Event event, ConsumerRecord<String, String> consumerRecord) {

    try {
        //call an external API to get realTime event details
        //Have a retry on this client
       BusinessEntity businessEntity = externalServiceClient.get(event);
       //process the Entity 
       anotherService.process(businessEntity);
    } catch(RetryableException re) {
        //feign.RetryableException
        //we are using feign declarative clients 
        consumerErrorHandler.handle(re, consumerRecord);
    }
}

ErrorHandler -> реализует org.springframework.kafka.listener.ErrorHandler

public class ConsumerErrorHandler implements ErrorHandler {

    @Autowired
    private final KafkaListenerEndpointRegistry registry;

    //org.springframework.core.task.SimpleAsyncTaskExecutor
    @Autowrired 
    private final Executor executor;

    @Autowired
    private Consumer<String, String> kafkaConsumer;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {

        //Trying to delegate this to a new Async thread.

        executor.execute(() -> {
            registry.getListenerContainers().forEach(container -> {

                if ((!container.isContainerPaused() || !container.isPauseRequested())) {
                    log.info("STOPPING_CONSUMER on error");

                    Optional<TopicPartition> topicPartition = container.getAssignedPartitions().stream().filter(a -> a.partition() == data.partition()).findFirst();

                    container.pause();
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }

                    log.info("BEFORE_RESUME");
                    log.info("SEEK CONSUMER before RESUME to this offset: "+data.offset());

                    topicPartition.ifPresent(a ->
                    {
                        log.info("Seek from the current position: " + data.offset());
                        kafkaConsumer.seek(a, data.offset());
                    });

                    container.resume();

                    log.info("RESUMING_CONSUMER  after seek");

                    topicPartition.ifPresent(a -> {
                        log.info("CONSUMER is up NOW ??");
                    });
                }
            });
        });

    }
}

Потребительские конфигурации

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> confMap = new HashMap<>();
        confMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, pubSubServers);
        confMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        confMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        confMap.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupIdConfig);
        confMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "50000");
        confMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000");
        confMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        confMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase());
        if (this.securityProtocol.equalsIgnoreCase(SSL)) {
            confMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, this.securityProtocol);
            confMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                    this.getClass().getResource(clientTrustStoreLocation).getPath());
            confMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTrustStorePassword);
            confMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
                    this.getClass().getResource(this.clientKeyStoreLocation).getPath());
            confMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslKeyStorePassword);
            confMap.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslKeyPassword);
            confMap.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,null);
        }
        return confMap;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConcurrency("1");
      factory.getContainerProperties().setAckOnError(false);
      factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
      factory.getContainerProperties().setConsumerTaskExecutor(taskExecutor());
      factory.setConsumerFactory(consumerFactory());
      factory.setErrorHandler(consumerErrorHandler);
      factory.setRetryTemplate(retryTemplate());
      return factory;
    }

    @Bean
    public AsyncListenableTaskExecutor taskExecutor() {
      return createTaskExecutor("1");
    }

     private RetryTemplate retryTemplate() {
         RetryTemplate template = new RetryTemplate();
         template.setRetryPolicy(retryPolicy());
         template.setBackOffPolicy(backOffPolicy());
         return template;
    }

    private BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(1000);
        return policy;
    }

    private RetryPolicy retryPolicy() {
         SimpleRetryPolicy policy = new SimpleRetryPolicy();
         policy.setMaxAttempts("1");
         return policy;
    }

1 Ответ

0 голосов
/ 17 марта 2019

Используйте ConsumerAwareErrorHandler.

Вы не можете выполнить поиск в другом потоке . См. KafkaConsumer javadocs - это не потокобезопасно.

Вы также должны искать оставшиеся записи для других тем / разделов (если только у вас нет только одной темы / раздела).

Наконец, вы не должны выходить из обработчика ошибок до тех пор, пока контейнер не будет приостановлен - в противном случае произойдет гонка, и потребитель может сделать еще один poll() перед этим pause() es.

См. SeekToCurrentErrorHandler и ContainerStoppingErrorHandler для примеров того, как делать такие вещи. stop() должен быть вызван в другом потоке, чтобы избежать тупиковой ситуации, но вы можете pause() контейнер в потоке потребителя (он просто устанавливает флаг, так что потребитель будет pause() перед следующим poll().

Для resume() контейнера используйте ApplicationListener или @EventListener для прослушивания событий простоя контейнера для приостановленного контейнера (установите idleEventIterval, чтобы получить эти события.

...