Обновление базы данных с помощью оконных агрегатов в реальном времени - PullRequest
0 голосов
/ 30 мая 2019

У меня есть потоковый конвейер, который использует потоковый источник (Kafka) и записывает данные в базу данных CloudSQL. Цель состоит в том, чтобы обновлять базу данных CloudSQL в режиме реального времени с агрегацией суммы ключ / значение записей, полученных за последний час.

например. За последний час было получено 3 записи с KV <001,3>, <001,4>, <001,2>, в базе данных должна быть запись 001, 9. Записи старше часа не включены.

Мое текущее решение - SlidingWindow в GroupByKey после KafkaIO.read:

                    .apply(Window.into(SlidingWindows
                                .of(Duration.standardSeconds(3600))
                                  .every(Duration.standardSeconds(20)))
                    ).apply(GroupByKey.create())

За ним следует ParDo, который суммирует каждый ключ, а затем обновляет базу данных SQL.

В результате каждые 20 секунд обновляется моя база данных CloudSQL с агрегацией каждого ключа за последний час, что соответствует функциональным требованиям. Проблема заключается в количестве обращений к CloudSQL, которое это вызывает: большинство выходных данных KV идентичны предыдущему окну, поэтому каждое окно каждые 20 секунд запускает транзакции на несколько часов (~ 500 тыс.).

Имеет смысл запускать каждый выход KV только тогда, когда используется запись с этим ключом, или избегать вывода KV, которые не изменились со времени предыдущего окна. Или какой-то фильтр перед вставкой в ​​CloudSQL, который принимает все и выводит только измененные KV. Возможно ли это или есть другое решение?

1 Ответ

0 голосов
/ 30 мая 2019

Одним из возможных способов изучения было бы использование API-интерфейса State после агрегации скользящего окна.

Однако элементы, поступающие в this, неупорядочены, поэтому вы не можете просто сохранить элемент и сравнить его с входящим значением.

  • Вам нужно будет добавить каждый элемент в DoFn в BagState (как значение с меткой времени).
  • Установите таймер, а затем во время функции OnTimer () прочитайте все элементы в bagstate, отсортируйте их и выведите нужные значения.Вам также нужно будет сохранить значение max (timestamp) в объекте ValueState, чтобы вы могли использовать его при следующем вызове OnTimer.

Размер окна, которое вы используете для State API DoFn, будет произвольным, чем он больше, тем меньше ненужных переходов.С другой стороны, чем больше окно, тем больше ключей вы будете хранить в ValueState, которые могут больше не понадобиться.Избегайте использования глобального окна, так как для этого потребуется встроить функцию GC, так как срок действия окна никогда не истечет, а пространство клавиш будет расти вечно.

...