Правильный способ программно остановить поток Alpakka Kafka - PullRequest
5 голосов
/ 07 марта 2019

Мы пытаемся использовать Akka Streams с Alpakka Kafka для потребления потока событий в сервисе.Для обработки ошибок обработки событий мы используем автокоммит Kafka и несколько очередей.Например, если у нас есть тема user_created, которую мы хотим использовать в службе продуктов, мы также создаем user_created_for_products_failed и user_created_for_products_dead_letter.Эти две дополнительные темы связаны с определенной группой потребителей Kafka.Если событие не может быть обработано, оно отправляется в очередь с ошибками, где мы пытаемся использовать снова через пять минут - если оно снова не удается, оно переходит на пустые буквы.

При развертывании мы хотим убедиться, что мыне теряйте событияПоэтому мы пытаемся остановить поток перед остановкой приложения.Как я уже сказал, мы используем автокоммит, но все эти «летающие» события еще не обработаны.Как только поток и приложение остановлены, мы можем развернуть новый код и снова запустить приложение.

После прочтения документации мы увидели функцию KillSwitch.Проблема, которую мы видим в этом, состоит в том, что метод shutdown возвращает Unit вместо Future[Unit], как мы ожидаем.Мы не уверены, что не потеряем события, используя его, потому что в тестах он выглядит слишком быстро, чтобы работать должным образом.

В качестве обходного пути мы создаем ActorSystem для каждого потока и используемметод terminate (который возвращает Future[Terminate]).Проблема с этим решением заключается в том, что мы не думаем, что создание ActorSystem для каждого потока будет хорошо масштабироваться, а terminate занимает много времени для решения (в тестах это может занять до одной минуты).

Сталкивались ли вы с такой проблемой?Есть ли более быстрый способ (по сравнению с ActorSystem.terminate) остановить поток и убедиться, что все события, отправленные Source, обработаны?

1 Ответ

4 голосов
/ 07 марта 2019

Из документации (выделено мое):

При использовании внешнее смещение хранилища , вызов Consumer.Control.shutdown()достаточно для завершения Source, который начинает завершение потока.

val (consumerControl, streamComplete) =
  Consumer
    .plainSource(consumerSettings,
                 Subscriptions.assignmentWithOffset(
                   new TopicPartition(topic, 0) -> offset
                 ))
    .via(businessFlow)
    .toMat(Sink.ignore)(Keep.both)
    .run()

consumerControl.shutdown()

Consumer.control.shutdown() возвращает Future[Done].Из его описания Scaladoc:

Отключение потребителя Source.Он будет ждать завершения ожидающих запросов на смещение, прежде чем завершит работу.

В качестве альтернативы, если вы используете смещенное хранилище в Kafka , используйте Consumer.Control.drainAndShutdown, что также возвращает Future.Снова из документации (которая содержит больше информации о том, что drainAndShutdown делает под обложками):

val drainingControl =
  Consumer
    .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record).map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(Keep.both)
    .mapMaterializedValue(DrainingControl.apply)
    .run()

val streamComplete = drainingControl.drainAndShutdown()

Описание Scaladoc для drainAndShutdown:

Прекратить создавать сообщения отSource, дождитесь завершения потока и выключите потребителя Source, чтобы все использованные сообщения достигли конца потока.Сбои в завершении потока будут распространяться, источник все равно будет отключен.

...