Как обновить переменную задачи в зависимости от задачи, которая ее инициировала - PullRequest
0 голосов
/ 03 июня 2019

Я извлекаю данные из API и помещаю в bigquery.Я хотел бы динамически изменить таблицу bigquery, в которую загружаются данные.Например: Задача, вызывающая API для «списка» данных, должна быть добавлена ​​в таблицу списка в bigquery.

В настоящее время у меня есть 3 разных дагса.Один для каждого вызова API.Я хотел сжать их в одну группу обеспечения доступности баз данных, но я не уверен, как динамически изменять таблицу назначения в моей задаче load_to_bg.


#Config Variables

task1 = MailchimpToS3Operator(
        task_id='lists',
        mailchimp_conn_id = MC_CONN_ID,
        mailchimp_resource = 'lists',
        dag=dag
    )

task2 = MailchimpToS3Operator(
        task_id='camapaigns',
        mailchimp_conn_id = MC_CONN_ID,
        mailchimp_resource = 'campaigns'
        dag=dag
    )

task3 = MailchimpToS3Operator(
        task_id='memebers',
        mailchimp_conn_id = MC_CONN_ID,
        mailchimp_resource = 'members',
        dag=dag
    )

load_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id='gcp_to_bq',
        destination_project_dataset_table = "mailchimp.mailchimp_{}".format(mailchimp_resource),
        dag=dag
        )

[task1,task2,task3] >> load_to_bq

Таблица destination_project_dataset_table должна обновиться, чтобы включить значение mailchimp_resource из запущенной задачиэто.

Ответы [ 2 ]

0 голосов
/ 04 июня 2019

Я думаю, что вы имеете в виду "Ветвление" в Airflow.Один из способов сделать это - использовать BranchPythonOperator .Этот оператор идет только по определенному пути в зависимости от результата вышестоящей задачи.

.enter image description here

См. Очень хорошее прочтение: https://medium.com/@guillaume_payen/use-conditional-tasks-with-apache-airflow-98bab35f1846 Документация по воздушному потоку: http://airflow.apache.org/concepts.html?highlight=branch#branching.

0 голосов
/ 04 июня 2019

Предполагая, что у вас есть определенный набор API, которые вы знаете заранее, или вы можете запросить во время выполнения, вы можете сделать следующее:

resources = [("resource name", "table to copy to")] # you can definitely call an api to get this resources list


with DAG("test_dag",schedule_interval="@daily") as dag:
    for res in resources:
        a = MailchimpToS3Operator(
            task_id=res[0],
            mailchimp_conn_id=MC_CONN_ID,
            mailchimp_resource=res[0],
            dag=dag
        )

        b = GoogleCloudStorageToBigQueryOperator(
            task_id='gcp_to_bq_'+res[0],
            destination_project_dataset_table = "mailchimp.mailchimp_"+res[1],
            dag=dag
            )

        a >> b
...