В основном я работаю с потоком воздуха и разработал задачу загрузки файла из внешнего источника.
t1 = PythonOperator(
task_id='download',
python_callable=download,
provide_context=True,
dag=dag)
, и этот поток воздуха выполняется в виртуальной среде (pipenv).
Функция загрузки:
def download(**kwargs):
folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
file_name = download_file(folder_id)
return file_name
, поэтому в основном я использую Xcons для передачи данных из одной задачи в другую ... и с помощью этой конфигурации невозможно управлять всеми зависимостями каждой группы DAG. .
В документации я обнаружил, что этот класс называется "PythonVirtualenvOperator", поэтому для его реализации я написал:
t1 = PythonVirtualenvOperator(
task_id='download',
python_callable=download,
requirements=['requests'],
python_version='3.8',
provide_context=True,
dag=dag
)
, и он дает мне следующую ошибку:
TypeError: can't pickle module objects
функция download_file - это соединение API, которое находится в другом файле.
есть предложения, как управлять средой и иметь связь между задачами?