Подключите 1 вход к n выходам с помощью Alpakka - PullRequest
0 голосов
/ 01 октября 2018

Я пытаюсь сделать несколько вариантов подключения производителя к потребителю с особым случаем, что иногда мне нужно было бы выдать 1 дополнительное сообщение на сообщение (например, 1 на выходную тему и 1 сообщение на другую тему), покасохраняя гарантии на это.

Я думал о том, чтобы выполнить mapConcat и вывести несколько объектов ProducerRecord, меня беспокоят слабые гарантии в крайнем случае, когда первого сообщения достаточно, чтобы фиксация произошла с этим смещением, что может привести к потенциальной потеревторой.Также кажется, что вы не можете просто сделать .flatmap, как если бы вы шли в API графа, который становится еще более грязным, так как становится сложнее убедиться, что после объединения в поток фиксации вы не просто игнорируете дублированное смещение.

Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
  .map(msg => (msg, addLineage(msg.record.value())))
  .mapConcat(input => 
    if (math.random > 0.25) 
      List(ProducerMessage.Message(
        new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
        input._1.committableOffset
      ))
    else List(ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
      input._1.committableOffset
    ),ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
      input._1.committableOffset
    ))
  )
  .via(Producer.flow(producerSettings))
  .map(_.message.passThrough)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
    (batch, elem) => batch.updated(elem)
  }
  .mapAsync(parallelism = 3)(_.commitScaladsl())
  .runWith(Sink.ignore)

Оригинальная документация 1 к 1 находится здесь: https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer

Кто-нибудь задумывался / решил эту проблему?

1 Ответ

0 голосов
/ 08 октября 2018

Соединитель Alpakka Kafka недавно представил flexiFlow, который поддерживает ваш сценарий использования: Пусть один элемент потока генерирует несколько сообщений для Kafka

...