Доступ к родительскому контексту dag во время создания подтега в потоке воздуха? - PullRequest
0 голосов
/ 18 февраля 2019

Я пытаюсь получить доступ во время создания подпадала к некоторым данным xcom из родительского dag, я искал, чтобы добиться этого в Интернете, но я ничего не нашел.

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )

получил эту ошибку с моимкод

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'

1 Ответ

0 голосов
/ 18 февраля 2019

Ошибка проста: вам не хватает аргумента context, требуемого методом xcom_pull().Но вы действительно не можете просто создать context для перехода в этот метод;это Python словарь , который Airflow передает методам привязки, таким как pre_execute() и execute() из BaseOperator (родительский класс всех Operator s).

Другими словами, context становится доступным только тогда, когда Operator действительно выполняется, а не во время DAG -определения .И это имеет смысл, потому что в таксономии из Airflow, xcom s являются механизмом связи между task s в realtime : общение друг с другом во время работы.


Но в конце дня Xcom с, как и любая другая Airflow модель, сохраняются в back-end meta-db .Поэтому, конечно, вы можете напрямую получить его оттуда (очевидно, только XCOM task, которые работали в прошлом).Хотя у меня нет кода-фрагмента , вы можете взглянуть на cli.py, где они использовали SQLAlchemy ORM для игры с моделями и backend-db,Поймите, что это будет означать запрос, отправляемый на ваш backend-db каждый раз, когда файл определения DAG анализируется , что происходит довольно быстро.


Полезные ссылки


EDIT-1

После просмотра вашего кода-фрагмента Я встревожился.Предполагая, что значение, возвращаемое xcom_pull(), будет часто меняться, число task s в вашем dag также будет постоянно меняться .Это может привести к непредсказуемому поведению (вы должны провести немало исследований, но у меня нет к этому никакого отношения)

Я бы посоветовал вам пересмотреть всю задачу рабочий процесс и сжатие до конструкции, в которой - число task с и - структура DAG известны заранее (во время исполнение из dag-файл определения ).Вы можете, конечно, перебрать файл json / результат запроса SQL (как упоминалось ранее SQLAlchemy) и т. Д., Чтобы породить ваши фактические task s, но этот файл / db / независимо от того, что долженне часто меняются.


Поймите, что простая итерация по списку для генерации task s не является проблемой;что НЕ возможно, это иметь структуру вашего DAG в зависимости от результата upstream task.Например, вы не можете создать n task в вашей DAG на основе восходящей задачи , вычисляющей значение n во время выполнения.


Так что это невозможно

Но это возможно (включая то, что вы пытаетесь достичь; даже если то, как вы это делаете, не кажется хорошей идеей)

...