Как использовать watchfornewfiles в потоке данных с исходной корзиной GCS? - PullRequest
0 голосов
/ 04 июня 2018

Ссылка на элемент: Просмотр новых файлов, соответствующих шаблону файла в Apache Beam

Можете ли вы использовать это для простых случаев использования?Мой пример использования: у меня есть пользователь, который загружает данные в облачное хранилище -> конвейер (процесс csv to json) -> большой запрос.Я знаю, что Cloud Storage - это ограниченная коллекция, поэтому она представляет собой пакетный поток данных.

Я хотел бы, чтобы конвейер продолжал работать в потоковом режиме, и как только файл будет загружен в облачное хранилище, он будет обработан через конвейер.Возможно ли это с watchfornewfiles?

Я написал свой код следующим образом:

p.apply(TextIO.read().from("<bucketname>")         
    .watchForNewFiles(
        // Check for new files every 30 seconds         
        Duration.standardSeconds(30),                      
        // Never stop checking for new files
        Watch.Growth.<String>never()));

Ни одно из содержимого не пересылается в Big Query, но конвейер показывает, что он потоковый.

1 Ответ

0 голосов
/ 25 июля 2018

Вы можете использовать триггеры Google Cloud Storage здесь: https://cloud.google.com/functions/docs/calling/storage#functions-calling-storage-python

Эти триггеры используют облачные функции, аналогичные Cloud Pub / Sub, которые запускаются на объектах, если они были: созданы / удалены / архивированы /или изменение метаданных.

Эти события отправляются с использованием уведомлений Pub / Sub из облачного хранилища, но обратите внимание, чтобы не устанавливать множество функций в одном сегменте, поскольку существуют некоторые ограничения на уведомления.

Также в конце документа есть ссылка на пример реализации.

...