Ищите потребителя смещения от отметок времени в реакторе-кафке - PullRequest
0 голосов
/ 23 марта 2020

Я пытаюсь добиться смещения потребителя от меток времени с использованием реактора-кафки У меня была логика c для поиска в методе doOnConsumer (), но похоже, что doOnConsumer () никогда не вызывался. Возможно, потому что я не использовал API правильно.

 public Disposable consumeMessages(List<String> topics) {

    ReceiverOptions<Integer, String> options = receiverOptions
            .addAssignListener(partitions -> {
                log.info("PARTITIONS ASSIGNED -> START");
                long currentTime = System.currentTimeMillis();
                // Initialize HashMap used for partition/timestamp mapping
                timestampsToSearch = new HashMap<>();
                partitions.forEach(p -> {
                    TopicPartition topicPartition = p.topicPartition();

                    log.info("Topic:{} Partition:{} Offset:{}", topicPartition.topic(), topicPartition.partition(), p.position());

                    long lastProcessedRecordTimestamp = spConsumerTimestampTracker.getLastProcessedTimestamp(topicPartition.topic(),topicPartition.partition());
                    log.info("lastProcessedRecordTimestamp:{}", lastProcessedRecordTimestamp);

                    if(lastProcessedRecordTimestamp==-1) {
                        //No previous timestamp found set it to current time
                        timestampsToSearch.put(topicPartition,currentTime);
                    } else {
                        timestampsToSearch.put(topicPartition,lastProcessedRecordTimestamp);
                    }
                });
                log.info("PARTITIONS ASSIGNED -> END");
            })
            .addRevokeListener(partitions -> {
                log.debug("onPartitionsRevoked {}", partitions);
                log.info("PARTITIONS REVOKED -> START");
                partitions.forEach(p -> {
                    TopicPartition topicPartition = p.topicPartition();
                    log.info("Topic:{} Partition:{} Offset:{}", topicPartition.topic(), topicPartition.partition(), p.position());
                });
                log.info("PARTITIONS REVOKED -> END");
            })
            .subscription(topics);

        Flux<ReceiverRecord<Integer, String>> kafkaFlux = KafkaReceiver.create(options).receive();

        KafkaReceiver.create(options)
                .doOnConsumer(consumer ->  {
                        log.info("doOnConsumer:START");
                        // Get the offsets for each partition
                        Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(timestampsToSearch);
                        // Seek to the specified offset for each partition
                        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
                            log.info("Consumer is seeking to {} offset for topic {} & partition {}", entry.getValue().offset(), entry.getKey().topic(), entry.getKey().partition());
                            consumer.seek(entry.getKey(), entry.getValue().offset());
                        }
                        return null;
                    }).concatWith(kafkaFlux);

        return kafkaFlux
                .map(record -> {
                    messageProcessor.processMessage(record);
                    record.receiverOffset().commit();
                    log.info("Committed Kafka message from topic :{} with offset:{}", record.topic(), record.offset());
                    spConsumerTimestampTracker.addPartitionLastProcessedTimestamp(record.topic(),record.partition(),record.timestamp());
                    return record;
                })
                .subscribe(record -> log.info("Consumer successfully subscribed from topic :{} with offset:{}", record.topic(), record.offset()));
}

Любая помощь очень ценится. Заранее спасибо.

...