Google Dataflow: считывание несвязанной коллекции ПК из Google Cloud Storage - PullRequest
0 голосов
/ 27 февраля 2020

У меня есть конвейер, который передает JSON сообщения из PubSub (Unbound PCollection) в Google Cloud Storage. Каждый файл должен содержать несколько JSON объектов, по одному на строку.

Я хочу создать еще один конвейер, который должен читать все JSON объекты из этого сегмента GCS для дальнейшей потоковой обработки, Самое главное, что этот второй конвейер должен работать как поток, а не как пакет. Значит, я хочу, чтобы он «слушал» корзину и обрабатывал каждый JSON записанный в нее объект. Unbound PCollection.

Есть ли способ добиться такого поведения?

Спасибо

Ответы [ 2 ]

2 голосов
/ 28 февраля 2020

Процесс потоковой передачи работает только с источником данных PubSub. Но не волнуйтесь, вы можете достичь своего конвейера.

  • Создайте подписку на уведомление для события publi sh на PubSub
  • Создайте конвейер, который прослушивает сообщение PubSub.
    • Когда приходит сообщение, прочитайте его и получите URI файла
    • Используйте API хранилища для чтения файла и вставьте каждую строку в конвейер
    • Продолжайте свой конвейер с каждым расшифрованная строка.
0 голосов
/ 10 марта 2020

Другой пользователь дал хороший ответ, который говорит вам, как делать то, что вы хотите, однако, если я правильно понимаю вашу проблему, я думаю, что могу рекомендовать более чистый подход.

Если предположить, что верно следующее, то Вы хотите:

  • Принять сообщение Pub / Sub в начале вашего конвейера
  • Обработать сообщение, поместив его в большое окно сообщений
  • Запись каждое окно сообщений в корзину GCS в виде файла
  • Помимо окон, описанных выше, обрабатывайте каждую строку по-другому

Затем вы можете вместо этого создать один конвейер, который просто разветвляется после шага «принять публикацию / подписку». Поток данных поддерживает это изначально очень хорошо. Вы бы сохранили ссылку на объект PCollection, возвращаемый при использовании приемника Pub / Sub в начале вашего конвейера. Затем вы должны применить несколько цепочек DoFn реализаций et c к этой единственной ссылке. Вы сможете выполнять управление окнами с записью в GCS, как сейчас, плюс обрабатывать каждое отдельное сообщение любым удобным для вас способом.

Это может выглядеть так:

Pipeline pipeline = Pipeline.create(options);

PCollection<String> messages = pipeline.apply("Read from Pub/Sub", PubsubIO.readStrings().fromTopic("my_topic_name));

// Current pipeline fork for windowing into GCS
messages.apply("Handle for GCS", ...);

// New fork for more handling
messages.apply("More stuff", ...);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...