Я пытаюсь сделать несколько вариантов подключения производителя к потребителю с особым случаем, что иногда мне нужно было бы выдать 1 дополнительное сообщение на сообщение (например, 1 на выходную тему и 1 сообщение на другую тему), покасохраняя гарантии на это.
Я думал о том, чтобы выполнить mapConcat и вывести несколько объектов ProducerRecord, меня беспокоят слабые гарантии в крайнем случае, когда первого сообщения достаточно, чтобы фиксация произошла с этим смещением, что может привести к потенциальной потеревторой.Также кажется, что вы не можете просто сделать .flatmap, как если бы вы шли в API графа, который становится еще более грязным, так как становится сложнее убедиться, что после объединения в поток фиксации вы не просто игнорируете дублированное смещение.
Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
.map(msg => (msg, addLineage(msg.record.value())))
.mapConcat(input =>
if (math.random > 0.25)
List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
))
else List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
),ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
input._1.committableOffset
))
)
.via(Producer.flow(producerSettings))
.map(_.message.passThrough)
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
(batch, elem) => batch.updated(elem)
}
.mapAsync(parallelism = 3)(_.commitScaladsl())
.runWith(Sink.ignore)
Оригинальная документация 1 к 1 находится здесь: https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer
Кто-нибудь задумывался / решил эту проблему?