Каковы различия между Consumer committableSource и plainSource? - PullRequest
0 голосов
/ 14 апреля 2019

Я пытаюсь использовать потребительскую библиотеку https://doc.akka.io/docs/alpakka-kafka/current/consumer.html метод committableSource следующим образом:

 Consumer
    .committableSource(consumerSettings, Subscriptions.topics("SAP-EVENT-BUS"))
    .map(_.committableOffset)
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run() 

Проблема в том, как получить сообщения, которые потребитель получает от Kafka?

Со следующим фрагментом кода работает:

  Consumer
    .plainSource(
      consumerSettings,
      Subscriptions.topics("SAP-EVENT-BUS"))
    .to(Sink.foreach(println))
    .run() 

Весь фрагмент кода:

private implicit val materializer = ActorMaterializer()
  private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
  private val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("SAP-SENDER-GROUP")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  private val committerSettings = CommitterSettings(context.system)

  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("TOPIC"))
    .map(_.committableOffset)
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

  Consumer
    .plainSource(
      consumerSettings,
      Subscriptions.topics("SAP-EVENT-BUS"))
    .to(Sink.foreach(println))
    .run()

Или я должен использовать оба: один для коммита, а другой для потребления.

1 Ответ

2 голосов
/ 16 апреля 2019

Вместо Committer.sink, который завершает поток, используйте Committer.flow, который позволяет вам продолжить поток, пока вы не решите прекратить его с другим приемником.

...