Я ищу сообщения с определенными смещениями, используя реактор Кафки. Основываясь на интерфейсе KafkaReceiver
, мы можем выполнить некоторые действия для потребителя, связанные с KafkaReceiver, вызвав <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);
Вот пример того, как я его использую:
KafkaReceiver.create(
receiverOptions
.subscription(setOf(topicName))
)
.doOnConsumer { consumer ->
consumer.partitionsFor(topicName)
.map { partitionInfo -> Pair(TopicPartition(topicName, partitionInfo.partition()), Instant.parse("2019-10-27-T00:00:00Z").toEpochMilli()) }
.toMap()
.let(consumer::offsetsForTimes)
}
.flatMapMany { offsetsForTimes ->
val options = receiverOptions
.subscription(setOf(topicName))
.addAssignListener { partitions ->
partitions.map { partion ->
offsetsForTimes[partion.topicPartition()]
?.offset()
?.let(partion::seek)
}
}
KafkaReceiver.create(options)
.receiveAutoAck()
}
.flatMap { it }
.map { record -> doingSomething() }
.onErrorContinue { exception, _ -> logger.error(exception)}
.doOnNext { data -> logger.info("ok ", data)}
.subscribe()
Когда я запускаю приложение, я получаю исключение nullPointerException
...
Caused by: reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
...
Может кто-нибудь сказать мне, как использовать записи с определенными смещениями, используя kafkaреактор? используя kafka offsetsForTimes. Спасибо.
Версия Kafka Reactor: 1.2.0. ВЫПУСКВерсия для весенней загрузки: 2.1.9.RELEASE