Я работаю над приложением на основе Apache Flink
, которое использует Apache Kafka
для ввода и вывода.Возможно, это приложение будет перенесено на Apache Spark
, поэтому я также добавил это как тег, и вопрос остался прежним.
У меня есть требование, что все входящие сообщения, полученные через kafka, должны обрабатываться впорядок, а также безопасно храниться в постоянном слое (базе данных), и ни одно сообщение не должно быть потеряно.
Потоковая часть в этом приложении довольно тривиальна / мала, так как основная логика сводится кчто-то вроде:
environment.addSource(consumer) // 1) DataStream[Option[Elem]]
.filter(_.isDefined) // 2) discard unparsable messages
.map(_.get) // 3) unwrap Option
.map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
.keyBy(_.id) // 5) assure in-order processing on logical-key level
.map(new DBFunction) // 6) database lookup, store of update and additional enrichment
.map(InputEvent.toXml(_)) // 7) convert back to XML
.addSink(producer) // 8) attach kafka producer sink
Теперь, во время этого конвейера, может произойти несколько ошибок :
- база данных становится недоступной (завершение работы, заполнение табличного пространства,...)
- изменения не могут быть сохранены из-за логических ошибок (из формата столбца)
- производитель kafka не может отправить сообщение из-за недоступности брокера
и, возможно,другие ситуации.
Теперь мой вопрос: как я могу обеспечить последовательность в соответствии с вышеизложенным в тех ситуациях, когда мне фактически пришлось бы это сделатьчто-то вроде:
- Stream-Operator 6) обнаруживает проблему (БД недоступна)
- Соединение с БД объекта
DBFunction
должно быть восстановлено, что может быть успешно выполнено только после некоторогоминут - Это означает, что общая обработка должна быть приостановлена, в лучшем случае для всего конвейера, чтобы входящие сообщения были загружены в память много
- Возобновить обработку после восстановления базы данных.Обработка должна возобновиться точно с сообщения, в котором возникла проблема: 1)
Теперь я знаю, что есть как минимум 2 инструмента для обработки ошибок:
- kafka потребителя смещения
- контрольные точки apache flink
Однако, просматривая документы, я не вижу, как можно использовать любой из них в процессе обработки потока из одного оператора.
Итак, каковы рекомендуемые стратегии для детальной обработки ошибок и восстановления в потоковом приложении?