Как запустить поток данных с помощью облачной функции? (Python SDK) - PullRequest
0 голосов
/ 28 октября 2019

У меня есть облачная функция, которая запускается облачным Pub / Sub. Я хочу, чтобы та же функция запускала поток данных, используя Python SDK. Вот мой код:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : {}'.format(message))

Я развернул функцию следующим образом:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1

Ответы [ 2 ]

0 голосов
/ 28 октября 2019

Вы должны встроить свой код Python для конвейера в вашу функцию. Когда вызывается ваша функция, вы просто вызываете основную функцию pipe python, которая выполняет конвейер в вашем файле.

Если вы разработали и опробовали свой конвейер в Cloud Shell и уже запустили его в конвейере Dataflow, ваш код долженимеют следующую структуру:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

Таким образом, вызовите эту функцию с правильным аргументом, особенно runner = DataflowRunner, чтобы позволить коду Python загружать конвейер в службе потока данных.

Удалить вконец result.wait_until_finish(), потому что ваша функция не будет жить весь процесс потока данных.

Вы также можете использовать шаблон, если хотите.

0 голосов
/ 28 октября 2019

Вы можете использовать Шаблоны облачных потоков данных для запуска вашей работы. Вам нужно будет выполнить следующие шаги:

  • Получить учетные данные
  • Создать экземпляр службы потока данных
  • Получить GCP PROJECT_ID
  • Создать тело шаблона
  • Выполнить шаблон

Вот пример использования вашего базового кода (не стесняйтесь разделить на несколько методов, чтобы уменьшить код внутри метода hello_pubsub).

from googleapiclient.discovery import build
import base64
import google.auth
import os

def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'

    credentials, _ = google.auth.default()
    service = build('dataflow', 'v1b3', credentials=credentials)
    gcp_project = os.environ["GCLOUD_PROJECT"]

    template_path = gs://template_file_path_on_storage/
    template_body = {
        "parameters": {
            "keyA": "valueA",
            "keyB": "valueB",
        },
        "environment": {
            "envVariable": "value"
        }
    }

    request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
    response = request.execute()

    print(response)

В template_bodyпеременная, значения параметров - это аргументы, которые будут отправлены в ваш конвейер, а значения среды используются службой потока данных (serviceAccount, рабочие и сетевые конфигурации).

Документация LaunchTemplateParameters

Документация среды выполнения

...