Мы пытаемся использовать Akka Streams с Alpakka Kafka для потребления потока событий в сервисе.Для обработки ошибок обработки событий мы используем автокоммит Kafka и несколько очередей.Например, если у нас есть тема user_created
, которую мы хотим использовать в службе продуктов, мы также создаем user_created_for_products_failed
и user_created_for_products_dead_letter
.Эти две дополнительные темы связаны с определенной группой потребителей Kafka.Если событие не может быть обработано, оно отправляется в очередь с ошибками, где мы пытаемся использовать снова через пять минут - если оно снова не удается, оно переходит на пустые буквы.
При развертывании мы хотим убедиться, что мыне теряйте событияПоэтому мы пытаемся остановить поток перед остановкой приложения.Как я уже сказал, мы используем автокоммит, но все эти «летающие» события еще не обработаны.Как только поток и приложение остановлены, мы можем развернуть новый код и снова запустить приложение.
После прочтения документации мы увидели функцию KillSwitch
.Проблема, которую мы видим в этом, состоит в том, что метод shutdown
возвращает Unit
вместо Future[Unit]
, как мы ожидаем.Мы не уверены, что не потеряем события, используя его, потому что в тестах он выглядит слишком быстро, чтобы работать должным образом.
В качестве обходного пути мы создаем ActorSystem
для каждого потока и используемметод terminate
(который возвращает Future[Terminate]
).Проблема с этим решением заключается в том, что мы не думаем, что создание ActorSystem
для каждого потока будет хорошо масштабироваться, а terminate
занимает много времени для решения (в тестах это может занять до одной минуты).
Сталкивались ли вы с такой проблемой?Есть ли более быстрый способ (по сравнению с ActorSystem.terminate
) остановить поток и убедиться, что все события, отправленные Source
, обработаны?