Как использовать PythonVirtualenvOperator в воздушном потоке? - PullRequest
2 голосов
/ 08 ноября 2019

В основном я работаю с потоком воздуха и разработал задачу загрузки файла из внешнего источника.

t1 = PythonOperator(
        task_id='download',
        python_callable=download,
        provide_context=True,
        dag=dag)

, и этот поток воздуха выполняется в виртуальной среде (pipenv).

Функция загрузки:

def download(**kwargs):
   folder_id = 'xxxxxx-xxxx-xxxx-xxxxxx'
   file_name = download_file(folder_id)
   return file_name

, поэтому в основном я использую Xcons для передачи данных из одной задачи в другую ... и с помощью этой конфигурации невозможно управлять всеми зависимостями каждой группы DAG. .

В документации я обнаружил, что этот класс называется "PythonVirtualenvOperator", поэтому для его реализации я написал:

t1 = PythonVirtualenvOperator(
        task_id='download',
        python_callable=download,
        requirements=['requests'],
        python_version='3.8',
        provide_context=True,
        dag=dag
    )

, и он дает мне следующую ошибку:

TypeError: can't pickle module objects

функция download_file - это соединение API, которое находится в другом файле.

есть предложения, как управлять средой и иметь связь между задачами?

...