Справочная информация:
У нас есть конвейер Flink, который состоит из нескольких источников, нескольких приемников и нескольких операторов вдоль конвейера, которые также обновляют базы данных.
Радии, чтобы упростить его, давайте предположим, что у нас есть конвейер, который выглядит так:
Source -> KeyBy -> FlatMap -> Filter -> Sink
Этот конвейер должен позволять нам прослушивать уведомления об изменениях в некоторых данных. (Каждое уведомление содержит идентификатор). Для каждого уведомления мы читаем данные из БД, запускаем алгоритм и обновляем одну и ту же строку БД. После этого мы также излучаем величину изменения данных. Только если величина изменения данных достаточно велика, мы отправляем уведомление в другую тему Кафки.
- Источник подписывается на тему Кафки для прослушивания уведомлений об измененных идентификаторах данных.
- KeyBy вводит идентификатор по идентификатору, чтобы убедиться, что один и тот же идентификатор не обрабатывается двумя экземплярами операторов одновременно.
- При наличии идентификатора FlatMap считывает данные из БД, запускает алгоритм иобновляет ту же строку БД. Он излучает величину изменения. Это FlatMap, а не Map, потому что в некоторых случаях мы не хотим выдавать какие-либо величины изменения, например, если у нас были какие-то конкретные ошибки.
- Фильтр фильтрует поток по величинам меньше некоторого порога
- Мойка отправляет отфильтрованные уведомления в другую тему Kafka.
Вопрос:
Мы хотим запустить конвейер с семантика ровно один раз . Из того, что мы видим, Flink поддерживает семантику "точно один раз" для источников Kafka, для приемников Kafka и для операторов с сохранением состояния или состояний в середине. Мы не смогли найти место, объясняющее, как это сделать ровно один раз с ресурсами, которые вы обновляете по конвейеру. Существует TwoPhaseCommitSinkFunction , которая позволяет создать функцию приемника, которая обеспечивает семантику с точностью до одного раза.
Мы не можем использовать ее, потому что хотим обновить базу данных и после этого отправлять уведомление об изменении вКафка. Выполнение этого в 2 отдельных приемниках создаст условия гонки, в которых мы можем получить уведомление о величине до того, как БД будет фактически обновлена.
Мы что-то упустили? Есть ли способ реализовать двухфазные коммиты в операторах Map / FlatMap? Есть ли другое решение?
Спасибо!