Изящно перезапустите Поток Потребителя Reactive-Kafka при сбое - PullRequest
0 голосов
/ 22 мая 2018

Проблема Когда я перезапускаю / завершаю / останавливаю поток, старый потребитель не умирает / выключается:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

Описание Я создаю службу, котораяполучает сообщение из темы Kafka и отправляет сообщение во внешнюю службу по HTTP-запросу.

  1. Соединение с внешней службой может быть разорвано, и моя служба должна повторить запрос.

  2. Кроме того, в случае ошибки в потоке необходимо перезапустить весь поток.

  3. Наконец, иногда мне не нужен поток и соответствующий ему Kafka-потребитель, и я хотел бы закрыть весь поток

Итак, у меня есть поток:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

Http-запрос отправляется в sourceFunction

Я следовал новым инструкциям по перезапуску Kafka Consumer в новой документации

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

Там открыт вопрос, в котором обсуждается этот не завершающий Потребитель в сложном Akka-потоке, но пока нет решения.

Есть ли обходной путь, который вызывает завершение Kafka Consumer

1 Ответ

0 голосов
/ 23 мая 2018

Как насчет упаковки потребителя в Actor и регистрации KillSwitch, см .: https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

Затем в методе Actor postStop вы можете завершить поток.Обернув Actor в BackoffSupervisor, вы получите экспоненциальный откат.

Пример субъекта: https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27

...