Я настраиваю многопользовательский кластер воздушного потока для команды исследователей данных, с различными вариантами использования для групп доступности баз данных (ETL, NLP, ML, NN ...), некоторые из которых имеют определенные зависимости python.Я не могу просто добавить все зависимости DAG на системном уровне.Конечно, я могу настроить базовый уровень для общего использования, но для конкретных нужд будет очень полезно положиться на эту функцию архива DAG.
Итак, для решения этой многоконтекстной проблемы ятестирование упакованной функции DAG в Airflow 1.9.0 (в Ubuntu 16.04).
Я следую примеру, чтобы протестировать его с произвольным пакетом pypi.
- Я случайно выбрал модуль python (python-crontab).(до этого я пробовал использовать более мощные модули, но воспроизведение тестов заняло больше времени)
- сценарий тестирования: возможность импортировать этот модуль и распечатать его версию в виде архива DAG
вот как я это сделал:
$ virtualenv venv --python=python3
$ source venv/bin/activate
(venv) $ mkdir contents && cd contents
$ pip install --install-option="--install-lib=$PWD" python-crontab
$ cp ../my_dag.py .
$ zip -r ../test_zip_2.zip *
$ cp ../test_zip_2.zip /path/to/dags
$ journalctl -f -u airflow-scheduler.service
(...)
WARNING - No viable dags retrieved from /path/to/dags/test_zip_2.zip
содержимое моего DAG:
import crontab
import airflow.utils.dates as a_dates
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from pprint import pprint
args = {
'owner': 'airflow',
'start_date': a_dates.days_ago(1)
}
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
print(crontab.__version__)
return 'Whatever you return gets printed in the logs'
with DAG(dag_id='test_zip', default_args=args, schedule_interval=None) as dag:
(
PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
)
>> DummyOperator(
task_id='do_nothing'
)
)
После проверки кода, похоже, что логика, которая анализирует ZIP-файл, немедленно завершается, если он находит .py-файл, который не содержит слов «DAG» и «airflow».
Проблема в том, что методЯ описал выше, на самом деле генерирует другие .py файлы в корне архива.
$ ll
total 100
drwxr-xr-x 1 vagrant vagrant 442 Jun 1 14:48 ./
drwxr-xr-x 1 vagrant vagrant 306 Jun 1 15:30 ../
-rw-rw-r-- 1 vagrant vagrant 3904 Dec 30 2015 cronlog.py
-rw-rw-r-- 1 vagrant vagrant 44651 May 25 16:44 crontab.py
-rw-rw-r-- 1 vagrant vagrant 4438 Dec 28 2015 crontabs.py
drwxr-xr-x 1 vagrant vagrant 476 Jun 1 14:26 dateutil/
-rw-r--r-- 1 vagrant vagrant 6148 Jun 1 14:24 .DS_Store
drwxr-xr-x 1 vagrant vagrant 204 Jun 1 14:26 __pycache__/
drwxr-xr-x 1 vagrant vagrant 272 Jun 1 14:26 python_crontab-2.3.3-py3.5.egg-info/
drwxr-xr-x 1 vagrant vagrant 306 Jun 1 14:26 python_dateutil-2.7.3-py3.5.egg-info/
drwxr-xr-x 1 vagrant vagrant 238 Jun 1 14:26 six-1.11.0-py3.5.egg-info/
-rw-rw-r-- 1 vagrant vagrant 30888 Sep 17 2017 six.py
-rw-r--r-- 1 vagrant vagrant 832 Jun 1 14:48 my_dag.py
Многие из известных пакетов, которые я тестировал, генерируют эти файлы .py верхнего уровня.Например.установка scrapy, numpy, pandas и т. д. вызвала тот же беспорядок.
Итак, какие могут быть варианты (без разветвления воздушного потока ^ _ ^)?
Правильно ли я понимаю эту функцию?
Спасибо за вашу помощь!