проблема
использование Spring Kafka версия: org.springframework.kafka: spring-kafka: 2.0.0.RELEASE.
поиск смещения с помощью ConsumerAwareRebalanceListener.onPartitionsAssigned, но получил пустую TopicPartitions.
пробовал ConsumerSeekAware, но также получил пустое назначение.
код
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getConsumerBootstrapServers());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId());
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getConsumerMaxPollRecords());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<Object, Object>(configs);
ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setMessageListener(messageListener);
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("onPartitionsAssigned_start,topic:{},partitionOffset:{},"
+ "consumer:{},topicPartitions:{},assignment:{}",
config.getTopic(),JSON.toJSONString(partitionOffset),
JSON.toJSONString(consumer),JSON.toJSONString(partitions),JSON.toJSONString(consumer.assignment()));
for(TopicPartition tp:partitions) {
if(!tp.topic().equals(config.getTopic())) {
continue;
}
Long offsetStart = partitionOffset.get(tp.partition());
if(offsetStart==null) {
continue;
}
consumer.seek(tp, offsetStart);
log.info("consumer_seek,TopicPartition:{},offsetStart:{}",
JSON.toJSONString(tp),offsetStart);
}
}
});
container = new ConcurrentMessageListenerContainer<>(factory,containerProperties);
container.setConcurrency(config.getConcurrency());
container.start();
я использую API неправильно?Любые предложения будут с благодарностью!