Как я могу создать группу обеспечения доступности баз данных, которая имеет динамический набор задач относительно даты выполнения? - PullRequest
3 голосов
/ 09 июля 2019

У нас есть DAG, которая получает некоторые данные с рекламной платформы.Эти объявления организованы в кампании.Наша цель - использовать метрики высокого уровня для этих кампаний.Для этого нам сначала нужно получить список активных кампаний на указанную дату выполнения - к счастью, API рекламной платформы делает это тривиальным, при условии, что мы знаем временной диапазон, о котором мы хотели бы узнать.

В настоящее времянаша группа обеспечения доступности баз данных структурирована так, чтобы собирать эти кампании и затем сохранять их в S3 и, наконец, в Redshift.Затем мы запрашиваем Redshift перед настройкой последующих задач, которые извлекают данные для каждой кампании.Это грубая часть.Мы могли бы также взглянуть на S3, но проблема в том, что ключи имеют значение макроса ds.Кажется, нет способа узнать это значение при создании самой группы доступности базы данных.

Наш текущий подход также не знает о дате выполнения, поэтому он всегда запрашивает все кампании, даже если эти кампании неактивен для интересующего нас периода времени.

Чтобы сделать это немного более конкретным, вот как выглядит этот DAG сегодня:

DAG of Ad Platform Campaign Metrics

Другим подходом было бы свести все это в один оператор, который заключает в себе получение набора кампаний на текущую дату выполнения и затем получение метрик для каждой из этих кампаний.Мы избежали этого, потому что это, по-видимому, исключает возможность извлечения данных параллельно с помощью отдельных задач для каждой кампании.

Как мы можем создать эту группу обеспечения доступности баз данных, чтобы мы поддерживали распараллеливание, предлагаемое путем динамического запроса таблиц Redshift для кампаний, но кампанииправильно ограничен датой исполнения?

Ответы [ 2 ]

0 голосов
/ 17 июля 2019

Предостережение: в интересах времени я собираюсь собрать воедино идеи и примеры кода из сторонних источников. Я воздаю должное этим источникам, чтобы вы могли взглянуть на контекст и документацию. Дополнительное предупреждение: я не смог проверить это, но я уверен на 99%, что это сработает.

Сложной частью всей этой операции будет выяснение того, как обращаться с вашими кампаниями, которые могли закончиться и начать резервное копирование. Воздушному потоку не понравится DAG с движущейся датой начала или окончания. Перемещение даты остановки может работать немного лучше, перемещение даты начала для dag не работает вообще. Тем не менее, если есть продленные кампании, вы должны быть в состоянии перенести дату окончания до тех пор, пока в непрерывности нет пробелов. Если у вас есть кампания, которая заканчивается, а затем продлевается на пару неактивных дней между ними, вы, вероятно, захотите выяснить, как сделать эти две кампании уникальными для воздушного потока.

Первый шаг

Вы захотите создать скрипт на python, который будет вызывать вашу базу данных и возвращать соответствующие данные из ваших кампаний. Предполагая, что это в MySQL, это будет выглядеть примерно так, пример соединения из документации по пакету pip PyMySQL :

import pymysql.cursors

# Connect to the database
connection = pymysql.connect(host='localhost',
                             user='user',
                             password='passwd',
                             db='db',
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor)

try:
    with connection.cursor() as cursor:
        # Create a new record
        sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
        cursor.execute(sql, ('webmaster@python.org', 'very-secret'))

    # connection is not autocommit by default. So you must commit to save
    # your changes.
    connection.commit()

    with connection.cursor() as cursor:
        # Read a single record
        sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
        cursor.execute(sql, ('webmaster@python.org',))
        result = cursor.fetchall()
finally:
    connection.close()

Второй шаг

Вы захотите пройтись по этому курсору и динамически создавать свои метки, аналогичные этому примеру из Astronomer.io :

from datetime import datetime

from airflow import DAG

from airflow.operators.python_operator import PythonOperator


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


# build a dag for each number in range(10)
for campaign in result:  # This is the pymysql result from above
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'

    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

Если вы разместите весь этот код в одном файле, он должен находиться в вашей папке dags . Когда в вашей базе данных появится новая кампания, вы создадите из нее dag и сможете использовать свою архитектуру subdag для выполнения точно такого же набора шагов / задач с параметрами, извлеченными из этой базы данных MySQL. Чтобы быть в безопасности и сохранить последние кампании в своем списке, я бы написал запрос mysql с буфером дат. Таким образом, у вас все еще есть даг, которые недавно закончились в вашем списке. В день окончания этих дагов вы должны заполнить end_date аргумент даг.

0 голосов
/ 10 июля 2019

Я не верю, что это возможно. DAG может отображать только в одной конфигурации, определенной в определении Python группы DAG. Вы не сможете контролировать, какая версия группы обеспечения доступности баз данных зависит от даты выполнения, поэтому вы не сможете, например, оглянуться назад на то, как DAG должен отображаться в прошлом. Если вы хотите, чтобы текущая группа обеспечения доступности баз данных отображалась на основе даты выполнения, вы можете написать некоторую логику в определении Python вашей группы обеспечения доступности баз данных.

В зависимости от того, как вы координируете свои задания Airflow, вы можете иметь одного оператора, как вы описали, но иметь этот единственный оператор, чтобы запускать параллельные запросы в Redshift и завершать, когда все запросы завершены.

...