Мы определили базового подписчика, который пропускает пропущенные сообщения (т. Е. По некоторым причинам бизнес-логики, которые мы не собираемся обрабатывать), выдавая исключение и полагаясь на контроль потока Akka Streams для возобновления Flow
:
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
Похоже, что это нормально работает для базовых сценариев использования при очень небольшой нагрузке, но мы заметили очень странные вещи для большего количества сообщений (например: очень частая повторная обработка сообщений и т. Д.).
Копаясь в коде, мы увидели, что broker.Subscriber.atLeastOnce
документация Lagom гласит:
flow
может вытянуть больше элементов из восходящего потока, но он должен излучать
ровно одно Done
сообщение для каждого полученного сообщения. Это должно
также отправлять их в том же порядке, в котором были получены сообщения. это
означает, что flow
не должен фильтровать или собирать подмножество
сообщения, вместо этого он должен разделить сообщения на отдельные потоки и
сопоставьте те, которые были бы сброшены до Done
.
Кроме того, в значении KafkaSubscriberActor
от Lagom мы видим, что значение частного atLeastOnce
, по существу, распаковывает полезную нагрузку и смещение сообщения, а затем снова архивирует, а затем выполняет резервное копирование после того, как наш поток пользователей отображает сообщения в Done
.
Эти два лакомых кусочка, по-видимому, означают, что при использовании потоковых супервизоров и пропускающих элементов мы можем оказаться в ситуации, когда смещения для фиксации больше не будут равномерно увеличиваться с Done
s, которые должны создаваться для сообщения Kafka.
Пример: если мы передадим 1, 2, 3, 4 и отобразим 1, 2 и 4 на Done
, но сгенерируем исключение на 3, у нас будет 3 Done
s и 4 смещения фиксации?
- Это правильно / ожидается? Означает ли это, что мы должны ИЗБЕЖАТЬ, используя потоковые супервизоры здесь?
- Какое поведение может быть вызвано неравномерным движением молнии?
- Каков рекомендуемый подход для обработки ошибок, когда речь идет о потреблении сообщений от Kafka через API брокера сообщений Lagom? Правильно ли делать для сопоставления / восстановления сбоев
Done
?
Использование Lagom 1.4.10