Есть ли способ создания динамических рабочих процессов в Airflow - PullRequest
0 голосов
/ 11 сентября 2018

Итак, у меня есть задача А, которая копирует неизвестное количество файлов в папку.Задача B выполняется для каждого из этих файлов в папке.У меня нет возможности узнать количество файлов заранее, так как они постоянно меняются.Есть ли способ сделать эту работу в потоке воздуха.

spans = os.listdir('/home/abc/tmpFolder')
counter = 0
for s in spans:
    src_path = '/home/abc/tmpFolder' + s
    dst_path = "tmp/" + s
    counter += 1
    run_this = \
        FileToGoogleCloudStorageOperator(
            task_id='gcp_task_' + str(counter),
            src=src_path,
            dst=dst_path,
            bucket='gcpBucket',
            google_cloud_storage_conn_id='gcp',
            mime_type='text/plain',
            dag=dag
        )
    dummy_operator_two.set_downstream(run_this)

Я получаю имена всех файлов в каталоге, а затем запускаю для них оператор, но поток воздуха не работает таким образом, так как ему нужно знать номер заранее.

Ответы [ 4 ]

0 голосов
/ 12 сентября 2018

Я не ожидаю, что Airflow изменит DAG, когда DagRun активен, поэтому я не буду ставить деньги на получение файлов и добавление задач в той же DAG. При этом Airflow регенерирует DAG каждые несколько секунд. У вас может быть одна группа DAG, которая получает файлы, и другая группа DAG, которая обрабатывает эти файлы. После получения файлов первый DAG должен будет подождать минуту, чтобы убедиться, что Airflow заметит, а затем запустить второй DAG с TriggerDagRunOperator.

.

DAG1:

def wait_a_minute():
    time.sleep(60)

get_files = DummyOperator(dag=dag, task_id='get_files')
give_airflow_time_to_rebuild_DAG2 = PythonOperator(dag=dag, task_id='give_airflow_time_to_rebuild_DAG2', python_callable=wait_a_minute)
trigger_DAG2 = TriggerDagRunOperator(dag=dag, task_id='trigger_DAG2', trigger_dag_id='DAG2', execution_date='{{ ds }}')

get_files >> give_airflow_time_to_rebuild_DAG2 >> trigger_DAG2

DAG2:

pre_process = DummyOperator(dag=dag, task_id='pre_process')
post_process = DummyOperator(dag=dag, task_id='post_process')

files = get_files_to_process()

for file in files:
    process = DummyOperator(dag=dag, task_id=f'process_{file}')
    pre_process >> process >> post_process

Больше взломать, чем решение, но что-то вроде этого должно работать. Есть проблемы с внешними триггерами и динамическими задачами. Обычно я сталкиваюсь с проблемами планировщика, когда приходится использовать depends_on_past=True.

0 голосов
/ 11 сентября 2018

Вы пытались использовать glob module и модифицировать свой конвейер для обработки всех файлов в данном каталоге?

0 голосов
/ 11 сентября 2018

Относительно моего блога о создании динамического рабочего процесса с использованием Apache Airflow , вы можете протестировать следующий код:

import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from datetime import datetime,timedelta
from os import listdir

default_args = {
  'owner': 'test',
  'depends_on_past': False,
  'start_date': airflow.utils.dates.days_ago(1),
  'retries': 1,
  'retry_delay': timedelta(minutes=5)
}

dag = DAG('dynamic',
  default_args=default_args,
  description='Dynamic DAG',
  schedule_interval=timedelta(days=1))

copy_files = BashOperator(task_id='cp_files',
   depends_on_past=False,
   bash_command='cp /tmp/filetocopy/* /tmp/filetoprocess',
   dag=dag)

start = DummyOperator(task_id='start',
                     dag=dag)
end = DummyOperator(task_id='end',
                   dag=dag)

start >> copy_files

spans = listdir('/tmp/filetoprocess')
counter = 1

for s in spans:
  src_path = '/tmp/filetoprocess/' + s
  dst_path = "/tmp/dest/" + s
  counter += 1
  task = FileToGoogleCloudStorageOperator(
            task_id='gcp_task_' + str(counter),
            src=src_path,
            dst=dst_path,
            bucket='gcpBucket',
            google_cloud_storage_conn_id='gcp',
            mime_type='text/plain',
            dag=dag)
  task.set_upstream(copy_files)
  task.set_downstream(end)

С этим кодом у вас уже должны быть некоторые файлы (вы также можете создать функцию Python, которая проверяет наличие файлов, в противном случае создайте DummyOperator, чтобы весь рабочий процесс работал) в папке /tmp/filetoprocess;в противном случае у планировщика Airflow возникнет проблема с созданием правильной группы доступности базы данных.

Я протестировал ее с новым выпуском Apache Airflow (v.1.10), и, похоже, он работает отлично.

Динамические задания на Airflow DAG

0 голосов
/ 11 сентября 2018

У меня есть такая штука, когда я делаю отдельные конвейеры вместо отдельных задач.

...