Потребитель Akka-Kafka перестает читать после первой партии - PullRequest
0 голосов
/ 28 мая 2020

Я пытаюсь открыть непрерывный поток на Kafka из своего микросервиса, чтобы иметь возможность следить за всеми обновлениями, поступающими из других сервисов. Я попытался использовать для этого akka-stream-kafka, но обнаружил, что потребитель читает то, что уже есть в topi c, но не читает новые сообщения после подписки. Вот моя реализация.

Consumer
        .committablePartitionedSource(consumerSettings, Subscriptions.topics(clientConfig.topic))
        .flatMapMerge(3, _._2)
        .mapAsync(1) { msg =>
          val value = msg.record.value
          val envelope = envelopeSerializer.decode[Envelope](value)
          val response = if(processor.isDefinedAt(envelope)) processor(envelope) else Future.successful()
          response.map(_ =>msg.committableOffset)
        }
        .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
        .run()

Я использую эталонную конфигурацию без изменений значений. Интересно, не хватает ли мне чего-то?

Мои версии зависимостей:

val akka = "2.5.31"
val kafka-clients = "2.4.1"
val akka-stream-kafka = "2.0.3"
...