IOs в потоке данных предпочитают делать слепые записи (в идеале в отдельную таблицу), поэтому ввод сохраняется. Это обеспечивает более стабильную и высокопроизводительную работу и предотвращает изменение входных данных конвейера.
Вставки, конечно, можно выполнить, просто используя BigQueryIO.Write
Для обновлений, если вы просто хотите перезаписать предыдущая строка в таблице. Тогда вы можете просто написать TableRow, используя BigQueryIO.Write.
Если хотите прочитать, изменить, записать строку (т.е. увеличить значение в строке на X). Тогда это будет включать чтение данных. Это сложнее. Вот несколько подходов к этому (для целей этого вы также можете рассматривать удаление как своего рода обновление. Т.е. может быть добавлена строка, указывающая, что все записи для этого «ключа» следует отбросить).
Подход 1 Записать каждое обновление в свою собственную строку
См. Ответ здесь, вы просто записываете каждое обновление (без чтения) в строку и вычисляете агрегированное значение, когда Вы выполняете запрос SQL. В зависимости от желаемых обновлений это может хорошо работать с вашим запросом SQL BQ.
Поток данных Google: вставка + обновление в BigQuery в конвейере потоковой передачи
Подход 2: Использование потокового состояния потока данных
При таком подходе вы можете создать StatefulParDo , который, по сути, позволяет вам сохранять некоторое постоянное состояние в потоке данных для каждого ключа потока данных для конкретный ParDo. Таким образом, вы можете сохранить здесь значение, обновлять его на основе ваших событий pubsub и выдавать элемент, который перезаписывает строку в выходном BigQueryTable с новым TableRow.
Если у вас есть существующая таблица BQ, которая может потребоваться чтобы инициализировать StaefulParDo, используя BigQueryIO.Read и PubsubIO.Read.
С этим состоянием потоковой передачи данных связано определенное ценообразование , о котором следует помнить.
При обновлении конвейера потока данных эти данные также могут быть сохранены.
Подход 3 Напишите таблицу журнала и вычислите требуемую итоговую таблицу
В этом подходе вы можете записать запись в таблицу для каждого события (независимо от того, является ли это вставкой, обновлением или удалением.
Затем прочитайте другой конвейер в этой таблице BigQuery, агрегируйте все строки с одним и тем же «ключом». Хотя вы будете необходимо использовать окно для агрегирования данных. Таким образом, вы сможете рассчитать актуальную запись для всех событий для ключа в одном и том же месте г, день и т. д. 1055 *. Использование этого окна.
Подход 4 Использование API BigQuery напрямую (не рекомендуется)
Создание собственного ParDo, который вручную читает, изменяет и записывает каждое обновление BigQuery с помощью API BigQuery прямо из ParDo (вместо использования BigqueryIO).
Не рекомендуется, так как при чтении, изменении и записи RPC каждая строка будет очень медленной из-за времени кругового обхода RP C. Это также может привести к нежелательным побочным эффектам, т. Е. В случае повторной обработки данных при повторных попытках потока данных обновления могут применяться несколько раз. Но это может сработать, если у вас все в порядке с неточностью и у вас небольшой объем данных.
Альтернативные подходы к этому включают в себя: Использование ParDo с потоковым состоянием (Это позволит вам обновить значение для ключа в Поток данных)