У нас есть тема PubSub с событиями, погружающимися в BigQuery (хотя конкретная БД здесь почти не имеет значения).События могут иметь новые неизвестные свойства, которые в конечном итоге должны стать отдельными столбцами BigQuery.
Итак, в основном у меня есть два вопроса:
- Как правильно поддерживать глобальныйсостояние внутри конвейера (с набором обнаруженных свойств в моем случае)?
- Что было бы хорошей стратегией для буферизации / удержания потока событий, как только будет найдено новое свойство и пока не будет выполнено
ALTER TABLE
Прямо сейчас я попытался использовать следующее (я использую Spotify scio):
rows
.withFixedWindows(Duration.millis(duration))
.withWindow[IntervalWindow]
.swap
.groupByKey
.map { case (window, rowsIterable) =>
val newRows = findNewProperties(rowsIterable)
mutateTableWith(newRows)
rowsIterable
}
.flatMap(id)
.saveAsBigQuery()
Но это ужасно неэффективно, поскольку нам по крайней мере нужно загрузить целые rowsIterable
в памятьи даже пройти его.