Bigquery, как удалить записи из потока данных - PullRequest
0 голосов
/ 19 марта 2020

Мне нужно обновить и удалить записи в BigQuery из потока данных. Данные поступают из Pubsub и поставляются с флагом, который определяет действие Вставить, Обновить, Удалить (I, U, D). Вставить не проблема.

Есть предложения по обновлению и удалению?

1 Ответ

1 голос
/ 20 марта 2020

IOs в потоке данных предпочитают делать слепые записи (в идеале в отдельную таблицу), поэтому ввод сохраняется. Это обеспечивает более стабильную и высокопроизводительную работу и предотвращает изменение входных данных конвейера.

Вставки, конечно, можно выполнить, просто используя BigQueryIO.Write

Для обновлений, если вы просто хотите перезаписать предыдущая строка в таблице. Тогда вы можете просто написать TableRow, используя BigQueryIO.Write.

Если хотите прочитать, изменить, записать строку (т.е. увеличить значение в строке на X). Тогда это будет включать чтение данных. Это сложнее. Вот несколько подходов к этому (для целей этого вы также можете рассматривать удаление как своего рода обновление. Т.е. может быть добавлена ​​строка, указывающая, что все записи для этого «ключа» следует отбросить).

Подход 1 Записать каждое обновление в свою собственную строку

См. Ответ здесь, вы просто записываете каждое обновление (без чтения) в строку и вычисляете агрегированное значение, когда Вы выполняете запрос SQL. В зависимости от желаемых обновлений это может хорошо работать с вашим запросом SQL BQ.

Поток данных Google: вставка + обновление в BigQuery в конвейере потоковой передачи

Подход 2: Использование потокового состояния потока данных

При таком подходе вы можете создать StatefulParDo , который, по сути, позволяет вам сохранять некоторое постоянное состояние в потоке данных для каждого ключа потока данных для конкретный ParDo. Таким образом, вы можете сохранить здесь значение, обновлять его на основе ваших событий pubsub и выдавать элемент, который перезаписывает строку в выходном BigQueryTable с новым TableRow.

Если у вас есть существующая таблица BQ, которая может потребоваться чтобы инициализировать StaefulParDo, используя BigQueryIO.Read и PubsubIO.Read.

С этим состоянием потоковой передачи данных связано определенное ценообразование , о котором следует помнить.

При обновлении конвейера потока данных эти данные также могут быть сохранены.

Подход 3 Напишите таблицу журнала и вычислите требуемую итоговую таблицу

В этом подходе вы можете записать запись в таблицу для каждого события (независимо от того, является ли это вставкой, обновлением или удалением.

Затем прочитайте другой конвейер в этой таблице BigQuery, агрегируйте все строки с одним и тем же «ключом». Хотя вы будете необходимо использовать окно для агрегирования данных. Таким образом, вы сможете рассчитать актуальную запись для всех событий для ключа в одном и том же месте г, день и т. д. 1055 *. Использование этого окна.

Подход 4 Использование API BigQuery напрямую (не рекомендуется)

Создание собственного ParDo, который вручную читает, изменяет и записывает каждое обновление BigQuery с помощью API BigQuery прямо из ParDo (вместо использования BigqueryIO).

Не рекомендуется, так как при чтении, изменении и записи RPC каждая строка будет очень медленной из-за времени кругового обхода RP C. Это также может привести к нежелательным побочным эффектам, т. Е. В случае повторной обработки данных при повторных попытках потока данных обновления могут применяться несколько раз. Но это может сработать, если у вас все в порядке с неточностью и у вас небольшой объем данных.

Альтернативные подходы к этому включают в себя: Использование ParDo с потоковым состоянием (Это позволит вам обновить значение для ключа в Поток данных)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...