Смещение коммитов пользователя Kafka вручную - PullRequest
0 голосов
/ 06 ноября 2018

Я внедряю и поток интеграции dsl spring, который принимает сообщения от Kafka

Фрагмент кода:

return IntegrationFlows.from(
                Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
                        kafkaPropertiesConfiguration.getTelemetryDataTopic()))
                })
                .handle(bla.someImportantOperation())
                //TODO:do manual commit here
                //.handle(consumer.commitSync())

                .get();

Я хотел бы знать, как я могу выполнить commitSync вручную, но только после успешного завершения .handle(bla.someImportantOperation()).

Я не знаю, как получить ссылку для потребителя, так как я использую DefaultKafkaConsumerFactory, буду признателен за любую помощь.

Вот мои потребительские свойства, которые я использую для создания потребителя:

consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());

consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

1 Ответ

0 голосов
/ 06 ноября 2018

Kafka.messageDrivenChannelAdapter() предоставляет для вас перехватчик конфигурации:

.configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.MANUAL))

Обратите внимание на вариант, который я предоставляю.

Прочитайте его Javadocs, а затем AcknowledgingMessageListener. Есть упоминание о Acknowledgment. этот присутствует в заголовках сообщений через KafkaHeaders.ACKNOWLEDGMENT.

Итак, то, что вам нужно в вашем //.handle(consumer.commitSync()), выглядит примерно так:

.handle(m -> headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge())

См. Дополнительную информацию в Spring для Apache Kafka Docs: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#committing-offsets

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...