Потоковые события из Kafka в Couchbase с использованием фиксации смещения Akka Stream и Kafka - PullRequest
0 голосов
/ 18 октября 2019

Я пытаюсь спроектировать поток Akka, используя Alpakka, чтобы читать события из темы kafka и помещать их в Couchbase.

Пока у меня есть следующий код, и он, кажется, работает как-то:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .map(profile ⇒ {
        RawJsonDocument.create(profile.record.key(), profile.record.value())
      })
      .via(
        CouchbaseFlow.upsertDoc(
          sessionSettings,
          writeSettings,
          bucketName
        )
      )
      .log("Couchbase stream logging")
      .runWith(Sink.seq)

Под «каким-то образом» я имею в виду, что поток фактически читает события из темы и помещает их в Couchbase в виде документов json, и это выглядит даже замечательно, несмотря на то, что я не понимаю, как фиксировать смещения потребителей в Kafka.

Если я четко понял основную идею, которая скрывается за смещениями потребителей Kafka, в случае какого-либо сбоя или перезапуска поток читает все сообщения из последнего зафиксированного смещения и, поскольку мы не зафиксировалиВозможно, он снова перечитывает записи, прочитанные на предыдущем сеансе.

Так я прав в своих предположениях? Если да, то как обрабатывать пользовательские коммиты в случае чтения из Kafka и публикации в какой-либо базе данных? Официальная документация Akka Streams содержит примеры, показывающие, как обращаться с такими случаями, используя обычные потоки Kafka, поэтому я понятия не имею, как зафиксировать смещения в моем случае.

Большое спасибо!

1 Ответ

1 голос
/ 18 октября 2019

Вам потребуется зафиксировать смещения в Couchbase, чтобы получить семантику «ровно один раз».

Это должно помочь: https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka

...