Управляемая / ручная обработка ошибок / восстановления в потоковых приложениях - PullRequest
0 голосов
/ 30 января 2019

Я работаю над приложением на основе Apache Flink, которое использует Apache Kafka для ввода и вывода.Возможно, это приложение будет перенесено на Apache Spark, поэтому я также добавил это как тег, и вопрос остался прежним.

У меня есть требование, что все входящие сообщения, полученные через kafka, должны обрабатываться впорядок, а также безопасно храниться в постоянном слое (базе данных), и ни одно сообщение не должно быть потеряно.

Потоковая часть в этом приложении довольно тривиальна / мала, так как основная логика сводится кчто-то вроде:

environment.addSource(consumer)    // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)             // 2) discard unparsable messages
  .map(_.get)                      // 3) unwrap Option
  .map(InputEvent.fromXml(_))      // 4) convert from XML to internal representation
  .keyBy(_.id)                     // 5) assure in-order processing on logical-key level
  .map(new DBFunction)             // 6) database lookup, store of update and additional enrichment
  .map(InputEvent.toXml(_))        // 7) convert back to XML
  .addSink(producer)               // 8) attach kafka producer sink

Теперь, во время этого конвейера, может произойти несколько ошибок :

  • база данных становится недоступной (завершение работы, заполнение табличного пространства,...)
  • изменения не могут быть сохранены из-за логических ошибок (из формата столбца)
  • производитель kafka не может отправить сообщение из-за недоступности брокера

и, возможно,другие ситуации.

Теперь мой вопрос: как я могу обеспечить последовательность в соответствии с вышеизложенным в тех ситуациях, когда мне фактически пришлось бы это сделатьчто-то вроде:

  1. Stream-Operator 6) обнаруживает проблему (БД недоступна)
  2. Соединение с БД объекта DBFunction должно быть восстановлено, что может быть успешно выполнено только после некоторогоминут
  3. Это означает, что общая обработка должна быть приостановлена, в лучшем случае для всего конвейера, чтобы входящие сообщения были загружены в память много
  4. Возобновить обработку после восстановления базы данных.Обработка должна возобновиться точно с сообщения, в котором возникла проблема: 1)

Теперь я знаю, что есть как минимум 2 инструмента для обработки ошибок:

  1. kafka потребителя смещения
  2. контрольные точки apache flink

Однако, просматривая документы, я не вижу, как можно использовать любой из них в процессе обработки потока из одного оператора.

Итак, каковы рекомендуемые стратегии для детальной обработки ошибок и восстановления в потоковом приложении?

1 Ответ

0 голосов
/ 06 марта 2019

Несколько моментов:

KeyBy не поможет обеспечить обработку заказа.Во всяком случае, он может чередовать события из разных разделов Kafka (которые могли быть в порядке в каждом разделе), создавая, таким образом, упорядоченность там, где его раньше не было.Трудно более конкретно прокомментировать, как вы можете гарантировать обработку заказов, не понимая, сколько экземпляров FlinkKafkaConsumer вы собираетесь использовать, сколько разделов будет потреблять каждый из них, как ключи распределены по разделам Kafka и почему вы думаетеkeyBy необходим - но если вы все настроите правильно, сохранение порядка может быть достигнуто. reinterpretAsKeyedStream может быть полезным здесь, но эту функцию трудно понять и сложно использовать правильно.

Вы можете использовать AsyncFunction от Flink для управления соединением с внешнимБД в отказоустойчивом, ровно одномоментном порядке.

Flink не поддерживает детальное восстановление на систематической основе - его контрольные точки являются глобальными моментальными снимками состояния всего распределенного кластера и предназначеныдля использования в процессе восстановления в качестве монолитного, самосогласованного снимка.Если ваше задание не выполняется, обычно единственным выходом является перезапуск с контрольной точки, который будет включать в себя перематывание входных очередей (со смещениями, сохраненными в контрольной точке), повторное воспроизведение событий после этих смещений, повторный запуск поиска в БД (что является асинхронной функцией).будет делать это автоматически) и используя транзакции kafka для достижения сквозной семантики.Однако в случае смущающих параллельных заданий иногда можно воспользоваться преимуществом мелкозернистого восстановления .

...