Управление потоком данных с большим разветвлением между шагами - PullRequest
0 голосов
/ 03 июля 2019

У меня есть 3 шага потока данных в конвейере потока данных.

  1. Читает из pubsub, сохраняет в таблице и разбивает на несколько событий (помещает в контекстный вывод).
  2. Для каждого разбиения запрашивает дБ и украшает событие дополнительными данными.
  3. Публикация в другой теме pubsub для дальнейшей обработки.

ПРОБЛЕМА:
После шага 1 его разбивают на события с 10K до 20K.

Теперь на шаге 2 заканчиваются соединения с базой данных. (У меня есть статический пул соединений Hikari).

Работает абсолютно нормально, будет меньше данных. Я использую машину n1-standard-32.

Что я должен сделать, чтобы ограничить ввод следующим шагом? Так что параллелизм ограничен или задушить события следующим шагом.

1 Ответ

1 голос
/ 03 июля 2019

Я думаю, что основная идея состоит в том, чтобы уменьшить параллелизм при выполнении шага 2 (Если у вас большой параллелизм, вам понадобится 20 тыс. Соединений для 20 тыс. Событий, потому что 20 тыс. Событий обрабатываются параллельно).

Идеи включают в себя:

  1. Выполнение с сохранением состояния ParDo сериализуется для каждого ключа на окно, что означает, что для состояния с сохранением ParDo требуется только одно соединение, потому что только один элемент должен обрабатываться в данный момент времени для ключа и окна.

  2. Одно соединение в комплекте.Вы можете инициализировать соединение в startBundle и сделать так, чтобы элементы в одном и том же пакете использовали одно и то же соединение (если я правильно понимаю, что внутри пакета выполнение, вероятно, сериализовано).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...