Я пытаюсь спроектировать поток Akka, используя Alpakka, чтобы читать события из темы kafka и помещать их в Couchbase.
Пока у меня есть следующий код, и он, кажется, работает как-то:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicIn))
.map(profile ⇒ {
RawJsonDocument.create(profile.record.key(), profile.record.value())
})
.via(
CouchbaseFlow.upsertDoc(
sessionSettings,
writeSettings,
bucketName
)
)
.log("Couchbase stream logging")
.runWith(Sink.seq)
Под «каким-то образом» я имею в виду, что поток фактически читает события из темы и помещает их в Couchbase в виде документов json, и это выглядит даже замечательно, несмотря на то, что я не понимаю, как фиксировать смещения потребителей в Kafka.
Если я четко понял основную идею, которая скрывается за смещениями потребителей Kafka, в случае какого-либо сбоя или перезапуска поток читает все сообщения из последнего зафиксированного смещения и, поскольку мы не зафиксировалиВозможно, он снова перечитывает записи, прочитанные на предыдущем сеансе.
Так я прав в своих предположениях? Если да, то как обрабатывать пользовательские коммиты в случае чтения из Kafka и публикации в какой-либо базе данных? Официальная документация Akka Streams содержит примеры, показывающие, как обращаться с такими случаями, используя обычные потоки Kafka, поэтому я понятия не имею, как зафиксировать смещения в моем случае.
Большое спасибо!