Я хочу запустить поток воздуха вот так ->
- У меня есть 2 рабочих потока воздуха W1 и W2.
- В W1 я запланировал одну задачу (W1-1), но в W2 я хочу создать X количество задач (W2-1, W2-2 ... W2-X).
- Число X и команда bash для каждой задачи будут получены из вызова БД.
- Все задачи для рабочего W2 должны выполняться параллельно после завершения W1.
Это мой код
dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')
t1 = BashOperator(
task_id='dummy_task',
bash_command='echo hi > /tmp/hi',
queue='W1_queue',
dag=dag)
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
t = BashOperator(
task_id='script_test_'+str(i),
bash_command="{full_command} ".format(full_command=str(record[0])),
queue=str(record[1]),
dag=dag)
t.set_upstream(t1)
i += 1
cursor.close()
connection.close()
Однако, когда я запускаю это, задача на W1 успешно завершена, но все задачи на W2 не выполнены. В интерфейсе воздушного потока я вижу, что он может решить правильное количество задач (в данном случае 10), но каждая из этих 10 не выполнена.
Глядя на журналы, я увидел, что на W2 (который находится на другой машине) потоку воздуха не удалось найти файл db_creds.json
.
Я не хочу предоставлять файл кредитов БД для W2.
У меня вопрос, как в этом случае динамически создавать задачу воздушного потока?
По сути, я хочу выполнить запрос к БД на сервере воздушного потока и назначить задачи одному или нескольким работникам на основе результатов этого запроса. База данных будет содержать обновленную информацию о том, какие движки активны и т. Д. Я хочу, чтобы DAG отразила это. Из журналов похоже, что каждый работник выполняет запрос к БД. Предоставление доступа к БД каждому работнику не вариант.