Я попытался воспроизвести вашу проблему, а затем попытался ее решить, поэтому я создал ту же структуру папок, что у вас уже есть:
/dags/
dataflow.py
dataflow/
pipeline.py -> pipeline
setup.py
my_modules/
__init__.py
common.py
Поэтому, чтобы она работала, я сделал следующее изменение: эти папки в месте, где выполняется экземпляр, код может найти его, например, в папке /tmp/
экземпляра.
Итак, моя группа доступности баз данных будет выглядеть примерно так:
1 - Сначала я объявляю свои аргументы:
default_args = {
'start_date': datetime(xxxx, x, x),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'dataflow_default_options': {
'project': '<project>',
'region': '<region>',
'stagingLocation': 'gs://<bucket>/stage',
'tempLocation': 'gs://<bucket>/temp',
'setup_file': <setup.py>,
'runner': 'DataflowRunner'
}
}
2 - После этого я создал группу обеспечения доступности баз данных и перед запуском задачи «Поток данных» скопировал весь каталог папки, созданный выше, в /tmp/
папка экземпляра Task t1
, и после этого я запускаю конвейер из / tmp / directory Task t2
:
with DAG(
'composer_df',
default_args=default_args,
description='datflow dag',
schedule_interval="xxxx") as dag:
def copy_dependencies():
process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*',
'/tmp/'])
process.communicate()
t1 = python_operator.PythonOperator(
task_id='copy_dependencies',
python_callable=copy_dependencies,
provide_context=False
)
t2 = DataFlowPythonOperator(task_id="composer_dataflow",
py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')
t1 >> t2
Вот так я и создал файл DAG dataflow.py
, а затем в pipeline.py
пакет для импорта будет выглядеть так:
from my_modules import commons
Он должен работать нормально, поскольку каталог папки понятен для виртуальной машины.