Использование БД для динамического создания задач воздушного потока - PullRequest
0 голосов
/ 02 июля 2018

Я хочу запустить поток воздуха вот так ->

  • У меня есть 2 рабочих потока воздуха W1 и W2.
  • В W1 я запланировал одну задачу (W1-1), но в W2 я хочу создать X количество задач (W2-1, W2-2 ... W2-X).
  • Число X и команда bash для каждой задачи будут получены из вызова БД.
  • Все задачи для рабочего W2 должны выполняться параллельно после завершения W1.

Это мой код

dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')

t1 = BashOperator(
        task_id='dummy_task',
        bash_command='echo hi > /tmp/hi',
        queue='W1_queue',
        dag=dag)

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
    t = BashOperator(
        task_id='script_test_'+str(i),
        bash_command="{full_command} ".format(full_command=str(record[0])),
        queue=str(record[1]),
        dag=dag)
    t.set_upstream(t1)
    i += 1

cursor.close()
connection.close()

Однако, когда я запускаю это, задача на W1 успешно завершена, но все задачи на W2 не выполнены. В интерфейсе воздушного потока я вижу, что он может решить правильное количество задач (в данном случае 10), но каждая из этих 10 не выполнена.

Глядя на журналы, я увидел, что на W2 (который находится на другой машине) потоку воздуха не удалось найти файл db_creds.json.

Я не хочу предоставлять файл кредитов БД для W2.

У меня вопрос, как в этом случае динамически создавать задачу воздушного потока? По сути, я хочу выполнить запрос к БД на сервере воздушного потока и назначить задачи одному или нескольким работникам на основе результатов этого запроса. База данных будет содержать обновленную информацию о том, какие движки активны и т. Д. Я хочу, чтобы DAG отразила это. Из журналов похоже, что каждый работник выполняет запрос к БД. Предоставление доступа к БД каждому работнику не вариант.

Ответы [ 2 ]

0 голосов
/ 05 июля 2018

Спасибо @ viraj-parekh и @ cwurtz.

После долгих проб и ошибок нашел правильный способ использования переменных воздушного потока для этого случая.

Шаг 1) Мы создаем еще один скрипт с именем gen_var.py и помещаем его в папку dag. Таким образом, планировщик подберет и сгенерирует переменные. Если код для генерации переменных находится внутри тега deploy_single, то мы столкнемся с той же проблемой зависимости, что и работник, который попытается обработать этот тег тоже.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import json
import psycopg2
from airflow.models import Variable
from psycopg2.extensions import AsIs

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()

hosts = {}
i = 1
for record in records:
    comm_dict = {}
    comm_dict['full_command'] = str(record[0])
    comm_dict['queue_name'] = str(record[1])
    hosts[i] = comm_dict
    i += 1

cursor.close()
connection.close()

Variable.set("hosts",hosts,serialize_json=True)

Обратите внимание на звонок serialize_json. Airflow попытается сохранить переменную в виде строки. Если вы хотите, чтобы оно сохранялось как диктовку, используйте serialize_json=True. Воздушный поток все равно сохранит его в виде строки через json.dumps

Шаг 2) Упростите dag и назовите эту переменную "hosts" (теперь десериализовать, чтобы вернуть диктовку) примерно так -

hoztz = Variable.get("hosts",deserialize_json=True)
for key in hoztz:
    host = hoztz.get(key)
    t = BashOperator(
        task_id='script_test_'+str(key),
        bash_command="{full_command} ".format(full_command=str(host.get('full_command'))),
        queue=str(host.get('queue_name')),
        dag=dag)
    t.set_upstream(t1)

Надеюсь, это поможет кому-то еще.

0 голосов
/ 03 июля 2018

Один из способов сделать это - сохранить информацию в Переменной воздушного потока .

Вы можете получить информацию, необходимую для динамического создания группы DAG (и необходимых конфигов) в переменной, и иметь W2 доступ к ней оттуда.

Переменные - это модель воздушного потока , которую можно использовать для хранения статической информации (информации, не имеющей связанной метки времени), к которой могут обращаться все задачи.

...