Flink двухфазная фиксация для функции карты для реализации семантики точно один раз - PullRequest
0 голосов
/ 04 ноября 2019

Справочная информация:

У нас есть конвейер Flink, который состоит из нескольких источников, нескольких приемников и нескольких операторов вдоль конвейера, которые также обновляют базы данных.

Радии, чтобы упростить его, давайте предположим, что у нас есть конвейер, который выглядит так:

Source -> KeyBy -> FlatMap -> Filter -> Sink

Этот конвейер должен позволять нам прослушивать уведомления об изменениях в некоторых данных. (Каждое уведомление содержит идентификатор). Для каждого уведомления мы читаем данные из БД, запускаем алгоритм и обновляем одну и ту же строку БД. После этого мы также излучаем величину изменения данных. Только если величина изменения данных достаточно велика, мы отправляем уведомление в другую тему Кафки.

  • Источник подписывается на тему Кафки для прослушивания уведомлений об измененных идентификаторах данных.
  • KeyBy вводит идентификатор по идентификатору, чтобы убедиться, что один и тот же идентификатор не обрабатывается двумя экземплярами операторов одновременно.
  • При наличии идентификатора FlatMap считывает данные из БД, запускает алгоритм иобновляет ту же строку БД. Он излучает величину изменения. Это FlatMap, а не Map, потому что в некоторых случаях мы не хотим выдавать какие-либо величины изменения, например, если у нас были какие-то конкретные ошибки.
  • Фильтр фильтрует поток по величинам меньше некоторого порога
  • Мойка отправляет отфильтрованные уведомления в другую тему Kafka.

Вопрос:

Мы хотим запустить конвейер с семантика ровно один раз . Из того, что мы видим, Flink поддерживает семантику "точно один раз" для источников Kafka, для приемников Kafka и для операторов с сохранением состояния или состояний в середине. Мы не смогли найти место, объясняющее, как это сделать ровно один раз с ресурсами, которые вы обновляете по конвейеру. Существует TwoPhaseCommitSinkFunction , которая позволяет создать функцию приемника, которая обеспечивает семантику с точностью до одного раза.

Мы не можем использовать ее, потому что хотим обновить базу данных и после этого отправлять уведомление об изменении вКафка. Выполнение этого в 2 отдельных приемниках создаст условия гонки, в которых мы можем получить уведомление о величине до того, как БД будет фактически обновлена.

Мы что-то упустили? Есть ли способ реализовать двухфазные коммиты в операторах Map / FlatMap? Есть ли другое решение?

Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...