Запуск задачи на composer DAG из события прибытия файла из Cloud Function - PullRequest
0 голосов
/ 30 апреля 2020

Могу ли я запустить задачу воздушного потока из облачной функции?

В основном моя проблема заключается в следующем. У меня есть файл, который поступает в облачное хранилище Google. Несколько файлов в одном DAG. Мне нужно вызвать задание преобразования, когда файл прибывает. Я думал использовать функцию облака. Но в моей DAG много зависимых заданий.

Любая помощь приветствуется

Ответы [ 2 ]

0 голосов
/ 06 мая 2020

Вам не обязательно использовать облачную функцию для определения файла в GCS, Composer имеет датчики GCS, которые можно использовать для достижения цели.

Предположим, вам необходимо отслеживать файлы в корзине / папке. /file_*.csv затем:

from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
    from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor 
    import datetime as dt
    from airflow.models import DAG
    from airflow.operators.dagrun_operator import TriggerDagRunOperator

    lasthour = dt.datetime.now() - dt.timedelta(hours=1)

    args = {
     'owner': 'airflow',
     'start_date': lasthour,
     'depends_on_past': False,
    }
    dag = DAG(
     dag_id='GCS_sensor_dag',
     schedule_interval=None,
     default_args=args
    )
    GCS_File_list = GoogleCloudStorageListOperator(
                        task_id= 'list_Files',
                        bucket= 'bucketname',
                        prefix='folder/file_',
                        delimiter='.csv',
                        google_cloud_storage_conn_id='google_cloud_default',
                        dag = dag
                    )
    file_sensor = GoogleCloudStoragePrefixSensor(
                        task_id='gcs_polling',  
                        bucket='bucketname',
                        prefix='folder/file_',
                        dag=dag
                    )

    trigger = TriggerDagRunOperator(
                        task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
                        trigger_dag_id="GCS_sensor_dag",
                        dag=dag
                    )

file_sensor >> GCS_File_list >> trigger
0 голосов
/ 30 апреля 2020

Вы можете вызвать группы доступности баз данных в ответ на изменение в хранилище Cloud Storage. Для этого sh облачные Composer DAG могут запускаться облачными функциями. Уже есть отличная официальная документация и Codelabs , которые описывают рабочий процесс. Он будет работать следующим образом:

  1. Загрузить файл в корзину Cloud Storage, которая будет
  2. Запускать облачную функцию с использованием Python / Node.JS runtime
  3. This Функция выполнит DAG в облаке Composer

Помните об одной вещи. Когда вы будете на шаге Creating your function. Вам необходимо заполнить эту строку: const WEBSERVER_ID = 'your-tenant-project-id';. Чтобы извлечь эту переменную, go в Airflow UI, чем Admin -> Configuration, и найдите ключ base_url, который является вашим webserver-id (без https:// и .appspot.com частей).

Другой способ сделать это, используя следующую команду:

gcloud composer environments describe <ENVIRONMENT_NAME> --location <LOCATION>

И вы сможете увидеть конфигурацию: -> airflowUri переменная.

Я пробовал этот сценарий один раз и это работает довольно хорошо. Не стесняйтесь задавать больше вопросов. Я надеюсь, что вы найдете вышеупомянутую информацию полезной.

...