Apache Airflow - получить все родительские task_ids - PullRequest
0 голосов
/ 17 февраля 2019

Предположим следующую ситуацию:

[c1, c2, c3] >> child_task

, где все c1, c2, c3 и child_task являются операторами и имеют task_id, равный id1, id2, id3 и child_id соответственно.

Задание child_task также является PythonOperator с provide_context=True и python_callable=dummy_func

def dummy_func(**context):
    #...

Можно ли получить всех родителей'идентификаторы внутри dummy_func (возможно, просматривая dag с использованием контекста)?

Ожидаемым результатом в этом случае будет список ['id1', 'id2', 'id3'].

1 Ответ

0 голосов
/ 17 февраля 2019

Свойства upstream_task_ids и downstream_task_ids из BaseOperator предназначены только для этой цели.

from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids

Обратите внимание, однакочто с этим property вы получаете только непосредственного ( восходящего / нисходящего ) соседа (ей) задачи.Чтобы получить всех предков или потомков task с, вы можете быстро приготовить старый добрый теория графов такой подход, как этот BFS - как реализация

from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator

def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
    ancestor_task_ids: Set[str] = set()
    tasks_queue: Queue = Queue()
    # determine parent tasks to begin BFS
    for task in my_task.upstream_list:
        tasks_queue.put(item=task)
    # perform BFS
    while not tasks_queue.empty():
        task: BaseOperator = tasks_queue.get()
        ancestor_task_ids.add(element=task.task_id)
        for _task in task.upstream_list:
            tasks_queue.put(item=_task)
    # Convert task_ids to actual tasks
    ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
    return ancestor_tasks

Вышеупомянутый фрагмент НЕ проверен, но я уверен, что вы можете черпать вдохновение из него


Ссылки

...