Я использую spring-kafka 1.1.3.RELEASE и kafka-clients 0.10.0.0, и я хочу установить setConsumerRebalanceListener на заводе, как это, но я не знаю, как заставить потребителя сохранить потребительский раздел. Спасибо за любые предложения!
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerBatchContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setConsumerTaskExecutor(execD());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for (TopicPartition partition:collection){
//TODO how to get consumer? saveOffsetInExternalStore(consumer,partition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for (TopicPartition partition:collection){
//TODO how to get consumer?
consumer.seek();
}
}
});
factory.setBatchListener(true);
return factory;
}
Я использую фабрику вот так:
@KafkaListener(group = "CID_alikafka_B024",topicPattern = "data_.*",containerFactory = "kafkaListenerBatchContainerFactory")
public void receive2(List<String> data,Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topicName,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys) {
logger.info("start of batch receive");
}
Я знаю, что у spring kafka 2.1.9 есть ConsumerAwareRebalanceListener, как это, но я хочу использовать spring kafka 1.1.3.RELEASE для совместимого kafka 0.10.0.0, у нас kafka - версия 0.10.0.0
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
Мой пом:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
<exclusions>
<!-- exclude kafka version problem-->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>