Я пытаюсь открыть непрерывный поток на Kafka из своего микросервиса, чтобы иметь возможность следить за всеми обновлениями, поступающими из других сервисов. Я попытался использовать для этого akka-stream-kafka, но обнаружил, что потребитель читает то, что уже есть в topi c, но не читает новые сообщения после подписки. Вот моя реализация.
Consumer
.committablePartitionedSource(consumerSettings, Subscriptions.topics(clientConfig.topic))
.flatMapMerge(3, _._2)
.mapAsync(1) { msg =>
val value = msg.record.value
val envelope = envelopeSerializer.decode[Envelope](value)
val response = if(processor.isDefinedAt(envelope)) processor(envelope) else Future.successful()
response.map(_ =>msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(DrainingControl.apply)
.run()
Я использую эталонную конфигурацию без изменений значений. Интересно, не хватает ли мне чего-то?
Мои версии зависимостей:
val akka = "2.5.31"
val kafka-clients = "2.4.1"
val akka-stream-kafka = "2.0.3"