Реакторы Кафки, потребляющие записи с конкретными смещениями - PullRequest
1 голос
/ 28 октября 2019

Я ищу сообщения с определенными смещениями, используя реактор Кафки. Основываясь на интерфейсе KafkaReceiver, мы можем выполнить некоторые действия для потребителя, связанные с KafkaReceiver, вызвав <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);Вот пример того, как я его использую:

        KafkaReceiver.create(
                receiverOptions
                        .subscription(setOf(topicName))
        )
                .doOnConsumer { consumer ->
                    consumer.partitionsFor(topicName)
                            .map { partitionInfo -> Pair(TopicPartition(topicName, partitionInfo.partition()), Instant.parse("2019-10-27-T00:00:00Z").toEpochMilli()) }
                            .toMap()
                            .let(consumer::offsetsForTimes)
                }
                .flatMapMany { offsetsForTimes ->
                    val options = receiverOptions
                            .subscription(setOf(topicName))
                            .addAssignListener { partitions ->
                                partitions.map { partion ->
                                    offsetsForTimes[partion.topicPartition()]
                                            ?.offset()
                                            ?.let(partion::seek)
                                }
                            }
                    KafkaReceiver.create(options)
                            .receiveAutoAck()
                }
                .flatMap { it }
                .map { record -> doingSomething() }
                .onErrorContinue { exception, _ -> logger.error(exception)}
                .doOnNext { data -> logger.info("ok ", data)}
                .subscribe()

Когда я запускаю приложение, я получаю исключение nullPointerException

...
Caused by: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
...

Может кто-нибудь сказать мне, как использовать записи с определенными смещениями, используя kafkaреактор? используя kafka offsetsForTimes. Спасибо.

Версия Kafka Reactor: 1.2.0. ВЫПУСКВерсия для весенней загрузки: 2.1.9.RELEASE

...