Поддержание глобального состояния в Apache Beam - PullRequest
0 голосов
/ 31 мая 2018

У нас есть тема PubSub с событиями, погружающимися в BigQuery (хотя конкретная БД здесь почти не имеет значения).События могут иметь новые неизвестные свойства, которые в конечном итоге должны стать отдельными столбцами BigQuery.

Итак, в основном у меня есть два вопроса:

  1. Как правильно поддерживать глобальныйсостояние внутри конвейера (с набором обнаруженных свойств в моем случае)?
  2. Что было бы хорошей стратегией для буферизации / удержания потока событий, как только будет найдено новое свойство и пока не будет выполнено ALTER TABLE

Прямо сейчас я попытался использовать следующее (я использую Spotify scio):

rows
  .withFixedWindows(Duration.millis(duration))
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map { case (window, rowsIterable) =>
    val newRows = findNewProperties(rowsIterable)
    mutateTableWith(newRows)
    rowsIterable
  }
  .flatMap(id)
  .saveAsBigQuery()

Но это ужасно неэффективно, поскольку нам по крайней мере нужно загрузить целые rowsIterable в памятьи даже пройти его.

1 Ответ

0 голосов
/ 01 июня 2018

Мы строим тот же проект, и мы следуем этому подходу с вводом обновляющей стороны, содержащим схемы (обновляются с интервалами от BQ).Итак, в основном:

  1. При боковом вводе загрузите схемы из BQ
  2. Потоковые данные в BQ, используя потоковый режим, так что вы действительно можете сделать что-то еще со строками, которые не удалось вставить (то есть: когда у них есть новое, неизвестное свойство)
  3. Сохраните эти неисправные в другом месте (хранилище данных?) для последующей обработки (например, в другом задании)
  4. Это задание восстановления будет вызыватьизменения схемы, которые в конечном итоге будут загружены вводом со стороны обновления основного конвейера (шаг 1).

У меня есть пример работы с подходом ввода со стороны обновления здесь

...