Как работать с файлами конфигурации в Airflow - PullRequest
0 голосов
/ 24 августа 2018

В Airflow мы создали несколько DAGS. Некоторые из них имеют общие свойства, например, каталог для чтения файлов. В настоящее время эти свойства перечислены как свойства в каждой отдельной группе обеспечения доступности баз данных, что, очевидно, станет проблематичным в будущем. Скажем, если имя каталога должно было измениться, нам нужно было бы войти в каждую группу обеспечения доступности баз данных и обновить этот фрагмент кода (возможно, даже пропуская его).

Я пытался создать какой-то файл конфигурации, который может быть проанализирован в Airflow и использован различными DAGS, когда требуется определенное свойство, но я не могу найти какую-либо документацию или руководство о том, как это сделать. этот. Больше всего я мог найти документацию по настройке идентификаторов подключений, но это не соответствует моему варианту использования.

Вопрос к моему посту, возможно ли сделать описанный выше сценарий и как?

Заранее спасибо.

1 Ответ

0 голосов
/ 24 августа 2018

Есть несколько способов сделать это в зависимости от настроек:

  • Вы можете использовать подход типа DagFactory, когда у вас есть функция, генерирующая группы доступности баз данных.Вы можете найти пример того, как это выглядит здесь

  • Вы можете сохранить конфигурацию JSON как переменную воздушного потока и проанализировать еесоздать DAG.Вы можете сохранить что-то вроде этого в Admin -> Переменные:

[{ "table": "users", "schema":"app_one", "s3_bucket":"etl_bucket", "s3_key":"app_one_users", "redshift_conn_id":"postgres_default" }, { "table": "users", "schema":"app_two", "s3_bucket":"etl_bucket", "s3_key":"app_two_users", "redshift_conn_id":"postgres_default"}]

Ваш DAG может быть сгенерирован как:

sync_config = json.loads(Variable.get("sync_config"))

with dag:
    start = DummyOperator(task_id='begin_dag')
    for table in sync_config:
        d1 = RedshiftToS3Transfer(
            task_id='{0}'.format(table['s3_key']),
            table=table['table'],
            schema=table['schema'],
            s3_bucket=table['s3_bucket'],
            s3_key=table['s3_key'],
            redshift_conn_id=table['redshift_conn_id']
        )
        start >> d1

Точно так же вы можете просто сохранить эту конфигурацию как локальный файл и открыть его, как любой другой файл.Имейте в виду, что лучший ответ на этот вопрос будет зависеть от вашей инфраструктуры и варианта использования.

...