Проблема Когда я перезапускаю / завершаю / останавливаю поток, старый потребитель не умирает / выключается:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.
Описание Я создаю службу, котораяполучает сообщение из темы Kafka и отправляет сообщение во внешнюю службу по HTTP-запросу.
Соединение с внешней службой может быть разорвано, и моя служба должна повторить запрос.
Кроме того, в случае ошибки в потоке необходимо перезапустить весь поток.
Наконец, иногда мне не нужен поток и соответствующий ему Kafka-потребитель, и я хотел бы закрыть весь поток
Итак, у меня есть поток:
Consumer.committableSource(customizedSettings, subscriptions)
.flatMapConcat(sourceFunction)
.toMat(Sink.ignore)
.run
Http-запрос отправляется в sourceFunction
Я следовал новым инструкциям по перезапуску Kafka Consumer в новой документации
RestartSource.withBackoff(
minBackoff = 20.seconds,
maxBackoff = 5.minutes,
randomFactor = 0.2 ) { () =>
Consumer.committableSource(customizedSettings, subscriptions)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
streamComplete
.flatMap { _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
}
.recoverWith {
case _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
}
}
.flatMapConcat(sourceFunction)
}
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run
Там открыт вопрос, в котором обсуждается этот не завершающий Потребитель в сложном Akka-потоке, но пока нет решения.
Есть ли обходной путь, который вызывает завершение Kafka Consumer