Компоновщик GCP / вызов воздушного потока Ошибка потока данных / выброса луча - PullRequest
0 голосов
/ 09 апреля 2020

У меня есть среда GCP Cloud Composer с версией воздушного потока composer-1.10.0-airflow-1.10.6 и python 3, если быть точным, 3,6. Я вызываю apache -поток в потоке данных с использованием оператора python_operator.PythonOperator. Вот фрагмент кода

Вызов функции конвейера

test_greeting = python_operator.PythonOperator(
    task_id='python_pipeline',
    python_callable=run_pipeline
)

Функция конвейера выглядит следующим образом

def run_pipeline():
    print("Test Pipeline")

    pipeline_args=[
        "--runner","DataflowRunner",
        "--project","*****",
        "--temp_location","gs://******/temp",
        "--region","us-east1",
        "--job_name","job1199",
        "--zone","us-east1-b"

    ]

    pipeline_options=PipelineOptions(pipeline_args)
    pipe=beam.Pipeline(options=pipeline_options)

    small_sum = (
        pipe
        | beam.Create([18,5,7,7,9,23,13,5])
        | "Combine Globally" >> beam.CombineGlobally(AverageFn())
        | 'Write results' >> beam.io.WriteToText('gs://******/ouptut_from_pipline/combine')
    )

    run_result=pipe.run()
    run_result.wait_until_finish()

    return "True"

Когда я запускаю это, конвейер запускается в потоке данных, но завершается со следующей ошибкой

    Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 648, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 150, in execute
    test_shuffle_sink=self._test_shuffle_sink)
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 116, in create_operation
    is_streaming=False)
  File "apache_beam/runners/worker/operations.py", line 1032, in apache_beam.runners.worker.operations.create_operation
  File "apache_beam/runners/worker/operations.py", line 845, in apache_beam.runners.worker.operations.create_pgbk_op
  File "apache_beam/runners/worker/operations.py", line 903, in apache_beam.runners.worker.operations.PGBKCVOperation.__init__
  File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'unusual_prefix_162ac8b7030d5bd1ff5f128a26483932d3968a4d_python_bash'

Версия луча Apache Beam Python 3.6 SDK 2.19.0. Я подозреваю, что версия Python 3.6 может быть проблемой, так как вызов конвейера напрямую (как бегун) из моей локальной системы работает нормально, и моя локальная система работает python 3.7.

Я не могу найти способ проверить эту теорию.

Было бы полезно получить советы о том, как решить эту проблему.

...