После нашего обсуждения в разделе комментариев я заметил, что вы не используете правильные команды для выполнения конвейера DataFlow.
Согласно документации существуют обязательные флаги, которые должны быть определены для запуска конвейера в Управляемая служба потока данных . Эти флаги описаны ниже:
имя-задания - Имя выполняемого задания потока данных.
project - Идентификатор вашего проекта Google Cloud. runner - конвейер
runner - , который проанализирует вашу программу и создаст ваш конвейер. Для облачного выполнения это должен быть DataflowRunner.
staging_location - Путь облачного хранилища для потока данных для создания пакетов кода, необходимых работникам, выполняющим работу.
temp_location - Путь облачного хранилища для потока данных для размещения временных файлов заданий, созданных во время выполнения конвейера.
Помимо этих флагов, в вашем случае вы можете использовать и другие, поскольку вы используете PubSub topi c:
- - input_topi c: задает входные данные Pub / Sub topi c для чтения сообщений из.
Поэтому пример запуска конвейера потока данных будет следующим:
python RunPipelineDataflow.py \
--job_name=jobName\
--project=$PROJECT_NAME \
--runner=DataflowRunner \
--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY\
--temp_location=gs://$BUCKET_NAME/temp
--input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \
Я хотел бы указать на важность использования DataflowRunner , он позволяет вам использовать управляемый сервис Cloud Dataflow, предоставляя полностью управляемый сервис, автоматическое масштабирование и динамическую перебалансировку работы c. Тем не менее, также возможно использовать DirectRunner , который выполняет ваш конвейер на вашей машине, он предназначен для проверки конвейера.