Я пытаюсь добиться смещения потребителя от меток времени с использованием реактора-кафки У меня была логика c для поиска в методе doOnConsumer (), но похоже, что doOnConsumer () никогда не вызывался. Возможно, потому что я не использовал API правильно.
public Disposable consumeMessages(List<String> topics) {
ReceiverOptions<Integer, String> options = receiverOptions
.addAssignListener(partitions -> {
log.info("PARTITIONS ASSIGNED -> START");
long currentTime = System.currentTimeMillis();
// Initialize HashMap used for partition/timestamp mapping
timestampsToSearch = new HashMap<>();
partitions.forEach(p -> {
TopicPartition topicPartition = p.topicPartition();
log.info("Topic:{} Partition:{} Offset:{}", topicPartition.topic(), topicPartition.partition(), p.position());
long lastProcessedRecordTimestamp = spConsumerTimestampTracker.getLastProcessedTimestamp(topicPartition.topic(),topicPartition.partition());
log.info("lastProcessedRecordTimestamp:{}", lastProcessedRecordTimestamp);
if(lastProcessedRecordTimestamp==-1) {
//No previous timestamp found set it to current time
timestampsToSearch.put(topicPartition,currentTime);
} else {
timestampsToSearch.put(topicPartition,lastProcessedRecordTimestamp);
}
});
log.info("PARTITIONS ASSIGNED -> END");
})
.addRevokeListener(partitions -> {
log.debug("onPartitionsRevoked {}", partitions);
log.info("PARTITIONS REVOKED -> START");
partitions.forEach(p -> {
TopicPartition topicPartition = p.topicPartition();
log.info("Topic:{} Partition:{} Offset:{}", topicPartition.topic(), topicPartition.partition(), p.position());
});
log.info("PARTITIONS REVOKED -> END");
})
.subscription(topics);
Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive();
KafkaReceiver.create(options)
.doOnConsumer(consumer -> {
log.info("doOnConsumer:START");
// Get the offsets for each partition
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(timestampsToSearch);
// Seek to the specified offset for each partition
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
log.info("Consumer is seeking to {} offset for topic {} & partition {}", entry.getValue().offset(), entry.getKey().topic(), entry.getKey().partition());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
return null;
}).concatWith(kafkaFlux);
return kafkaFlux
.map(record -> {
messageProcessor.processMessage(record);
record.receiverOffset().commit();
log.info("Committed Kafka message from topic :{} with offset:{}", record.topic(), record.offset());
spConsumerTimestampTracker.addPartitionLastProcessedTimestamp(record.topic(),record.partition(),record.timestamp());
return record;
})
.subscribe(record -> log.info("Consumer successfully subscribed from topic :{} with offset:{}", record.topic(), record.offset()));
}
Любая помощь очень ценится. Заранее спасибо.