Ошибка проста: вам не хватает аргумента context
, требуемого методом xcom_pull()
.Но вы действительно не можете просто создать context
для перехода в этот метод;это Python
словарь , который Airflow
передает методам привязки, таким как pre_execute()
и execute()
из BaseOperator
(родительский класс всех Operator
s).
Другими словами, context
становится доступным только тогда, когда Operator
действительно выполняется, а не во время DAG
-определения .И это имеет смысл, потому что в таксономии из Airflow
, xcom
s являются механизмом связи между task
s в realtime : общение друг с другом во время работы.
Но в конце дня Xcom
с, как и любая другая Airflow
модель, сохраняются в back-end meta-db .Поэтому, конечно, вы можете напрямую получить его оттуда (очевидно, только XCOM task
, которые работали в прошлом).Хотя у меня нет кода-фрагмента , вы можете взглянуть на cli.py
, где они использовали SQLAlchemy
ORM для игры с моделями и backend-db,Поймите, что это будет означать запрос, отправляемый на ваш backend-db каждый раз, когда файл определения DAG
анализируется , что происходит довольно быстро.
Полезные ссылки
EDIT-1
После просмотра вашего кода-фрагмента Я встревожился.Предполагая, что значение, возвращаемое xcom_pull()
, будет часто меняться, число task
s в вашем dag
также будет постоянно меняться .Это может привести к непредсказуемому поведению (вы должны провести немало исследований, но у меня нет к этому никакого отношения)
Я бы посоветовал вам пересмотреть всю задачу рабочий процесс и сжатие до конструкции, в которой - число task
с и - структура DAG
известны заранее (во время исполнение из dag-файл определения ).Вы можете, конечно, перебрать файл json
/ результат запроса SQL
(как упоминалось ранее SQLAlchemy
) и т. Д., Чтобы породить ваши фактические task
s, но этот файл / db / независимо от того, что долженне часто меняются.
Поймите, что простая итерация по списку для генерации task
s не является проблемой;что НЕ возможно, это иметь структуру вашего DAG
в зависимости от результата upstream
task
.Например, вы не можете создать n task
в вашей DAG
на основе восходящей задачи , вычисляющей значение n во время выполнения.
Так что это невозможно
Но это возможно (включая то, что вы пытаетесь достичь; даже если то, как вы это делаете, не кажется хорошей идеей)