Предостережение: в интересах времени я собираюсь собрать воедино идеи и примеры кода из сторонних источников. Я воздаю должное этим источникам, чтобы вы могли взглянуть на контекст и документацию. Дополнительное предупреждение: я не смог проверить это, но я уверен на 99%, что это сработает.
Сложной частью всей этой операции будет выяснение того, как обращаться с вашими кампаниями, которые могли закончиться и начать резервное копирование. Воздушному потоку не понравится DAG с движущейся датой начала или окончания. Перемещение даты остановки может работать немного лучше, перемещение даты начала для dag не работает вообще. Тем не менее, если есть продленные кампании, вы должны быть в состоянии перенести дату окончания до тех пор, пока в непрерывности нет пробелов. Если у вас есть кампания, которая заканчивается, а затем продлевается на пару неактивных дней между ними, вы, вероятно, захотите выяснить, как сделать эти две кампании уникальными для воздушного потока.
Первый шаг
Вы захотите создать скрипт на python, который будет вызывать вашу базу данных и возвращать соответствующие данные из ваших кампаний. Предполагая, что это в MySQL, это будет выглядеть примерно так, пример соединения из документации по пакету pip PyMySQL :
import pymysql.cursors
# Connect to the database
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
db='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
try:
with connection.cursor() as cursor:
# Create a new record
sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
# connection is not autocommit by default. So you must commit to save
# your changes.
connection.commit()
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
cursor.execute(sql, ('webmaster@python.org',))
result = cursor.fetchall()
finally:
connection.close()
Второй шаг
Вы захотите пройтись по этому курсору и динамически создавать свои метки, аналогичные этому примеру из Astronomer.io :
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
# build a dag for each number in range(10)
for campaign in result: # This is the pymysql result from above
dag_id = 'hello_world_{}'.format(str(n))
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)
}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
Если вы разместите весь этот код в одном файле, он должен находиться в вашей папке dags . Когда в вашей базе данных появится новая кампания, вы создадите из нее dag и сможете использовать свою архитектуру subdag для выполнения точно такого же набора шагов / задач с параметрами, извлеченными из этой базы данных MySQL. Чтобы быть в безопасности и сохранить последние кампании в своем списке, я бы написал запрос mysql с буфером дат. Таким образом, у вас все еще есть даг, которые недавно закончились в вашем списке. В день окончания этих дагов вы должны заполнить end_date
аргумент даг.