У меня есть среда GCP Cloud Composer с версией воздушного потока composer-1.10.0-airflow-1.10.6
и python 3, если быть точным, 3,6. Я вызываю apache -поток в потоке данных с использованием оператора python_operator.PythonOperator
. Вот фрагмент кода
Вызов функции конвейера
test_greeting = python_operator.PythonOperator(
task_id='python_pipeline',
python_callable=run_pipeline
)
Функция конвейера выглядит следующим образом
def run_pipeline():
print("Test Pipeline")
pipeline_args=[
"--runner","DataflowRunner",
"--project","*****",
"--temp_location","gs://******/temp",
"--region","us-east1",
"--job_name","job1199",
"--zone","us-east1-b"
]
pipeline_options=PipelineOptions(pipeline_args)
pipe=beam.Pipeline(options=pipeline_options)
small_sum = (
pipe
| beam.Create([18,5,7,7,9,23,13,5])
| "Combine Globally" >> beam.CombineGlobally(AverageFn())
| 'Write results' >> beam.io.WriteToText('gs://******/ouptut_from_pipline/combine')
)
run_result=pipe.run()
run_result.wait_until_finish()
return "True"
Когда я запускаю это, конвейер запускается в потоке данных, но завершается со следующей ошибкой
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 648, in do_work
work_executor.execute()
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 150, in execute
test_shuffle_sink=self._test_shuffle_sink)
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 116, in create_operation
is_streaming=False)
File "apache_beam/runners/worker/operations.py", line 1032, in apache_beam.runners.worker.operations.create_operation
File "apache_beam/runners/worker/operations.py", line 845, in apache_beam.runners.worker.operations.create_pgbk_op
File "apache_beam/runners/worker/operations.py", line 903, in apache_beam.runners.worker.operations.PGBKCVOperation.__init__
File "/usr/local/lib/python3.6/site-packages/apache_beam/internal/pickler.py", line 290, in loads
return dill.loads(s)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.6/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'unusual_prefix_162ac8b7030d5bd1ff5f128a26483932d3968a4d_python_bash'
Версия луча Apache Beam Python 3.6 SDK 2.19.0
. Я подозреваю, что версия Python 3.6 может быть проблемой, так как вызов конвейера напрямую (как бегун) из моей локальной системы работает нормально, и моя локальная система работает python 3.7.
Я не могу найти способ проверить эту теорию.
Было бы полезно получить советы о том, как решить эту проблему.