passThrough
- это дополнительное значение, которое становится доступным для использования в ProducerMessage.Results
* passThrough()
.
Как указано в официальной документации , лучшие и наиболееШироко распространенной практикой является использование этого значения в качестве транспорта для передачи CommittableOffset
в нисходящий поток Committer.Sink
.
Это кажется удобным в тех случаях, когда вы используете CommittableSource
вместе с опцией enable.auto.commit
Kafka для потребителей, установленной на false
. В таких случаях вы должны фиксировать смещения Kafka вручную при их обработке. В частности, я использую такой подход при чтении записей из одной темы Кафки, обработке их и последующей отправке в другую тему Кафки.
Вот простой пример использования:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicIn))
.mapAsync { msg ⇒
process(msg.record.value).map {
response ⇒
ProducerMessage.single(
new ProducerRecord(topicOut, msg.record.key, response),
passThrough = msg.committableOffset
)
}
}
.via(Producer.flexiFlow(producerSettings))
.map(_.passThrough) // extract the passThrough
.toMat(Committer.sink(committerSettings))(Keep.both) // commiting offsets
.mapMaterializedValue(DrainingControl.apply)
.run()