Вы должны встроить свой код Python для конвейера в вашу функцию. Когда вызывается ваша функция, вы просто вызываете основную функцию pipe python, которая выполняет конвейер в вашем файле.
Если вы разработали и опробовали свой конвейер в Cloud Shell и уже запустили его в конвейере Dataflow, ваш код долженимеют следующую структуру:
def run(argv=None, save_main_session=True):
# Parse argument
# Set options
# Start Pipeline in p variable
# Perform your transform in Pipeline
# Run your Pipeline
result = p.run()
# Wait the end of the pipeline
result.wait_until_finish()
Таким образом, вызовите эту функцию с правильным аргументом, особенно runner = DataflowRunner
, чтобы позволить коду Python загружать конвейер в службе потока данных.
Удалить вконец result.wait_until_finish()
, потому что ваша функция не будет жить весь процесс потока данных.
Вы также можете использовать шаблон, если хотите.