Для чего используется проход в соединителе alpakka-kafka при создании сообщений? - PullRequest
0 голосов
/ 27 октября 2019

Это код для создания единственного сообщения для Кафки, указанный в документе https://doc.akka.io/docs/alpakka-kafka/current/producer.html

val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
  ProducerMessage.single(
    new ProducerRecord("topicName", key, value),
    passThrough
  )

Не могли бы вы объяснить, для чего используется passThrough?

1 Ответ

0 голосов
/ 31 октября 2019

passThrough - это дополнительное значение, которое становится доступным для использования в ProducerMessage.Results * passThrough().

Как указано в официальной документации , лучшие и наиболееШироко распространенной практикой является использование этого значения в качестве транспорта для передачи CommittableOffset в нисходящий поток Committer.Sink.

Это кажется удобным в тех случаях, когда вы используете CommittableSource вместе с опцией enable.auto.commit Kafka для потребителей, установленной на false. В таких случаях вы должны фиксировать смещения Kafka вручную при их обработке. В частности, я использую такой подход при чтении записей из одной темы Кафки, обработке их и последующей отправке в другую тему Кафки.

Вот простой пример использования:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .mapAsync { msg ⇒
        process(msg.record.value).map {
          response ⇒
            ProducerMessage.single(
              new ProducerRecord(topicOut, msg.record.key, response),
              passThrough = msg.committableOffset
            )
        }
      }
      .via(Producer.flexiFlow(producerSettings))
      .map(_.passThrough) // extract the passThrough
      .toMat(Committer.sink(committerSettings))(Keep.both) // commiting offsets
      .mapMaterializedValue(DrainingControl.apply)
      .run()
...