Как установить приоритет для разных групп доступности базы данных в Airflow - PullRequest
1 голос
/ 02 июня 2019

Скажем, у нас есть два DAG, dag1 и dag2, они служат различным бизнес-требованиям. они совершенно не связаны. но dag1 важнее завершить как можно раньше.
Для простоты у них обоих есть только одна задача, и они выполняются ежедневно.

В сценарии, где dag1 отстает от графика на 2 или 3 дня, я хочу убедиться, что dag1 запускается и сначала завершает свои dag_runs, т. Е. Dag1 обновляется после того, как dag2 может продолжаться.

Я пробовал priority_weight, но он не работает для разных пакетов.

Мне нужен способ помещения этих задач из разных пакетов в одну очередь и достижения приоритетов на уровне DAG.

Ответы [ 2 ]

2 голосов
/ 05 июня 2019

Из официальной документации для внешнего датчика задачи :

Waits for a different DAG or a task in a different DAG to complete for
a specific execution_date.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    :param external_task_id: The task_id that contains the task you want to
        wait for. If ``None`` the sensor waits for the DAG
    :type external_task_id: str
    :param allowed_states: list of allowed states, default is ``['success']``
    :type allowed_states: list
    :param execution_delta: time difference with the previous execution to
        look at, the default is the same execution_date as the current task or DAG.
        For yesterday, use [positive!] datetime.timedelta(days=1). Either
        execution_delta or execution_date_fn can be passed to
        ExternalTaskSensor, but not both.
    :type execution_delta: datetime.timedelta
    :param execution_date_fn: function that receives the current execution date
        and returns the desired execution dates to query. Either execution_delta
        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
    :type execution_date_fn: callable
    :param check_existence: Set to `True` to check if the external task exists (when
        external_task_id is not None) or check if the DAG to wait for exists (when
        external_task_id is None), and immediately cease waiting if the external task
        or DAG does not exist (default value: False).
    :type check_existence: bool

Для обоих групп обеспечения доступности баз данных depends_on_past Правило триггера должно быть установлено на True, чтобыболее новые запланированные прогоны DAG будут выполняться только в том случае, если предыдущие запланированные прогоны были успешно завершены.

Затем добавьте внешний датчик задачи в начале Dag 2 (тот, который выполняется позже).

В качестве альтернативыВы можете создать свой собственный датчик и использовать его через Плагины воздушного потока , чтобы проверять базу метаданных на статус Dag Runs.

Вы также можете создавать датчики клиента, которые используют либо XCOM воздушного потока или Переменные воздушного потока для передачи времени выполнения выполнения или любых других макросов воздушного потока на датчик в DAG 2.

0 голосов
/ 05 июня 2019

Я нашел специальное решение, в котором я мог бы просто обернуть оба мешка в слой блокировки.

Под этим подразумевается, что мы добавляем простую задачу в начале, которая блокирует определенную строку в базе данных, а затем в конце тега, мы также добавляем простую задачу, которая разблокирует заблокированную строку.
Поэтому, когда в настоящее время выполняется один из двух пакетов, а другой хочет начать работу, он просто блокируется, поскольку не может заблокировать конкретную строку, известную для обоих пакетов.

Ниже приведено простое описание фиксирующего слоя
dag1: оператор блокировки, задача 1, оператор блокировки.
dag2: оператор блокировки, задача 1, оператор блокировки.

Конечно, мы можем позволить завершить работу lock_operator, если он не может заблокировать строку, и установить значение retries_count очень высоким, поэтому мы гарантируем, что оно все еще будет пытаться заблокировать, пока не сможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...