Динамически создавать задачи в потоке воздуха с внешним файлом - PullRequest
1 голос
/ 10 января 2020

Я пытаюсь создать группу обеспечения доступности баз данных, которая генерирует задачи динамически на основе файла JSON, расположенного в хранилище. Я следовал этому руководству шаг за шагом:

https://bigdata-etl.com/apache-airflow-create-dynamic-dag/

Но DAG застревает со следующим сообщением:

Dag getting stuck

Можно ли прочитать внешний файл и использовать его для динамического создания задач в Composer? Я могу сделать это, когда я читаю данные только из переменной воздушного потока, но когда я читаю внешний файл, метка застревает в состоянии isn't available in the web server's DagBag object. Мне нужно читать из внешнего файла, так как содержимое JSON будет меняться при каждом выполнении.

Я использую composer-1.8.2-airflow-1.10.2.

Я прочитал ответ на аналогичный вопрос:

Dynami c определение задачи в Airflow

Но я не пытаюсь создавать задачи на основе отдельной задачи, только на основе внешнего файла.

Это мой второй подход, который также застревает в этом состоянии ошибки:

import datetime

import airflow
from airflow.operators import bash_operator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
import json
import os

products = json.loads(Variable.get("products"))

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2020, 1, 10),
}

with airflow.DAG(
        'json_test2',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:


        # Print the dag_run's configuration, which includes information about the
        # Cloud Storage object change.
        def read_json_file(file_path):
            if os.path.exists(file_path):
                with open(file_path, 'r') as f:
                    return json.load(f)

        def get_run_list(files):
            run_list = []
            #The file is uploaded in the storage bucket used as a volume by Composer
            last_exec_json = read_json_file("/home/airflow/gcs/data/last_execution.json")
            date = last_exec_json["date"]
            hour = last_exec_json["hour"]
            for file in files:
                #Testing by adding just date and hour
                name = file['name']+f'_{date}_{hour}'
                run_list.append(name)
            return run_list

        rl = get_run_list(products)

        start = DummyOperator(task_id='start', dag=dag)
        end = DummyOperator(task_id='end', dag=dag)

        for name in rl:
            tsk = DummyOperator(task_id=name, dag=dag)
            start >> tsk >> end

1 Ответ

0 голосов
/ 14 января 2020

Можно создать группу обеспечения доступности баз данных, которая генерирует задачу динамически на основе файла JSON, который находится в корзине облачного хранилища. Я следовал инструкции, которую вы предоставили, и она отлично работает в моем случае.

Сначала вам нужно загрузить файл конфигурации JSON в каталог $AIRFLOW_HOME/dags, а затем файл DAG python по тому же пути. (вы можете найти путь в файле airflow.cfg, который находится в корзине).

Позже вы сможете увидеть DAG в интерфейсе Airflow:

enter image description here

Как видно из журнала DAG isn't available in the web server's DagBag object, группа доступности базы данных недоступна на веб-сервере Airflow. Однако DAG можно запланировать как активную, поскольку Airflow Scheduler работает независимо с веб-сервером Airflow.

Когда множество DAG загружаются одновременно в среду Composer, это может привести к перегрузке среды. Поскольку веб-сервер Airflow находится в проекте, управляемом Google, только некоторые типы обновлений приведут к перезапуску контейнера веб-сервера, например, к добавлению или обновлению одного из пакетов PyPI или изменению параметра Airflow. Обходной путь - добавить фиктивную переменную среды:

  • Открыть Composer экземпляр в GCP
  • ENVIRONMENT VARIABLE tab
  • Edit, затем добавить переменную среды и Submit

Вы можете использовать следующую команду для его перезапуска:

gcloud composer environments update ${ENVIRONMENT_NAME}  --location=${ENV_LOCATION}  --update-airflow-configs=core-dummy=true
gcloud composer environments update ${ENVIRONMENT_NAME}  --location=${ENV_LOCATION}  --remove-airflow-configs=core-dummy

Надеюсь, вы найдете приведенные выше сведения полезными.

...