Я внедряю и поток интеграции dsl spring, который принимает сообщения от Kafka
Фрагмент кода:
return IntegrationFlows.from(
Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(kafkaTelemetryDataConsumerConfiguration.getConsumerProperties()),
kafkaPropertiesConfiguration.getTelemetryDataTopic()))
})
.handle(bla.someImportantOperation())
//TODO:do manual commit here
//.handle(consumer.commitSync())
.get();
Я хотел бы знать, как я могу выполнить commitSync вручную, но только после успешного завершения .handle(bla.someImportantOperation())
.
Я не знаю, как получить ссылку для потребителя, так как я использую DefaultKafkaConsumerFactory, буду признателен за любую помощь.
Вот мои потребительские свойства, которые я использую для создания потребителя:
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, kafkaPropertiesConfiguration.getClientIdConfig());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConfiguration.getGroupIdConfig());
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");