У меня есть потоковый конвейер, который использует потоковый источник (Kafka) и записывает данные в базу данных CloudSQL. Цель состоит в том, чтобы обновлять базу данных CloudSQL в режиме реального времени с агрегацией суммы ключ / значение записей, полученных за последний час.
например. За последний час было получено 3 записи с KV <001,3>
, <001,4>
, <001,2>
, в базе данных должна быть запись 001, 9
. Записи старше часа не включены.
Мое текущее решение - SlidingWindow в GroupByKey после KafkaIO.read
:
.apply(Window.into(SlidingWindows
.of(Duration.standardSeconds(3600))
.every(Duration.standardSeconds(20)))
).apply(GroupByKey.create())
За ним следует ParDo, который суммирует каждый ключ, а затем обновляет базу данных SQL.
В результате каждые 20 секунд обновляется моя база данных CloudSQL с агрегацией каждого ключа за последний час, что соответствует функциональным требованиям. Проблема заключается в количестве обращений к CloudSQL, которое это вызывает: большинство выходных данных KV идентичны предыдущему окну, поэтому каждое окно каждые 20 секунд запускает транзакции на несколько часов (~ 500 тыс.).
Имеет смысл запускать каждый выход KV только тогда, когда используется запись с этим ключом, или избегать вывода KV, которые не изменились со времени предыдущего окна. Или какой-то фильтр перед вставкой в CloudSQL, который принимает все и выводит только измененные KV. Возможно ли это или есть другое решение?