Как безопасно пропустить сообщения, используя Lagom Kafka Message Broker API? - PullRequest
2 голосов
/ 11 марта 2019

Мы определили базового подписчика, который пропускает пропущенные сообщения (т. Е. По некоторым причинам бизнес-логики, которые мы не собираемся обрабатывать), выдавая исключение и полагаясь на контроль потока Akka Streams для возобновления Flow:

someLagomService
  .someTopic()
  .subscribe
  .withGroupId("lagom-service")
  .atLeastOnce(
    Flow[Int]
      .mapAsync(1)(el => {
        // Exception may occur here or can map to Done
      })
      .withAttributes(ActorAttributes.supervisionStrategy({
        case t =>
          Supervision.Resume
      })
  )

Похоже, что это нормально работает для базовых сценариев использования при очень небольшой нагрузке, но мы заметили очень странные вещи для большего количества сообщений (например: очень частая повторная обработка сообщений и т. Д.).

Копаясь в коде, мы увидели, что broker.Subscriber.atLeastOnce документация Lagom гласит:

flow может вытянуть больше элементов из восходящего потока, но он должен излучать ровно одно Done сообщение для каждого полученного сообщения. Это должно также отправлять их в том же порядке, в котором были получены сообщения. это означает, что flow не должен фильтровать или собирать подмножество сообщения, вместо этого он должен разделить сообщения на отдельные потоки и сопоставьте те, которые были бы сброшены до Done.

Кроме того, в значении KafkaSubscriberActor от Lagom мы видим, что значение частного atLeastOnce, по существу, распаковывает полезную нагрузку и смещение сообщения, а затем снова архивирует, а затем выполняет резервное копирование после того, как наш поток пользователей отображает сообщения в Done.

Эти два лакомых кусочка, по-видимому, означают, что при использовании потоковых супервизоров и пропускающих элементов мы можем оказаться в ситуации, когда смещения для фиксации больше не будут равномерно увеличиваться с Done s, которые должны создаваться для сообщения Kafka.

Пример: если мы передадим 1, 2, 3, 4 и отобразим 1, 2 и 4 на Done, но сгенерируем исключение на 3, у нас будет 3 Done s и 4 смещения фиксации?

  • Это правильно / ожидается? Означает ли это, что мы должны ИЗБЕЖАТЬ, используя потоковые супервизоры здесь?
  • Какое поведение может быть вызвано неравномерным движением молнии?
  • Каков рекомендуемый подход для обработки ошибок, когда речь идет о потреблении сообщений от Kafka через API брокера сообщений Lagom? Правильно ли делать для сопоставления / восстановления сбоев Done?

Использование Lagom 1.4.10

1 Ответ

0 голосов
/ 11 марта 2019

Это правильно / ожидается?Означает ли это, что нам следует ИЗБЕЖАТЬ, используя потоковые супервизоры здесь?

Официальная документация API говорит, что

Если модуль брокера сообщений Kafka Lagom находится в процессеиспользуется, то по умолчанию поток автоматически перезапускается при возникновении сбоя.

Таким образом, нет необходимости добавлять собственный supervisionStrategy для управления обработкой ошибок.И поток будет перезапущен по умолчанию, и вам не следует думать о «пропущенных» сообщениях «Готово».


Какого рода поведение может вызывать неравномерное сжатие?

Именно поэтому в документации сказано:

Это означает, что поток не должен фильтровать или собирать подмножество сообщений

Он может недооценивать неправильное смещение,И при перезапуске вы можете получить уже обработанные сообщения в форме воспроизведения из зафиксированного нижнего смещения.


Какой рекомендуемый подход для обработки ошибок, когда речь идет о потреблении сообщений от Kafkaчерез API брокера сообщений Lagom?Правильно ли выполнить сопоставление / восстановление сбоев в Done?

Lagom заботится об обработке исключений, удаляя сообщение, вызвавшее ошибку, и перезапуская поток.И сбои карты / восстановления в Done не изменятся.

Вы могли бы рассмотреть, в случае, если вам понадобится доступ к этим сообщениям позже, также используйте, например, Try {}, то есть не выбрасывая исключение, и собирайте сообщения с ошибками, отправляя их в другую тему., это даст вам возможность отслеживать количество ошибок и воспроизводить сообщения, которые вызвали ошибку, когда условия выполняются правильно, т.е. ошибка устранена.

...