Воздушный поток: создать единовременный динамический SubDag - PullRequest
0 голосов
/ 14 декабря 2018

Я создаю DAG, который выполняет задачи, которые были предварительно определены в некоторой базе данных.после выполнения задач я обновляю время их выполнения до тех пор, пока они не будут выполнены снова.Цель каждой задачи в основном состоит в проведении sql unittesting.

До сих пор я пытался

  1. создавать основной родительский dag
  2. получать список задач из базы данных
  3. для каждой строки(задача) - я создаю подзадачу, которая содержит процесс выполнения
  4. , когда все подзадачи завершаются - я обновляю время выполнения задач

, в настоящее время он не выполняется после того, какпервый забег.ошибка, которая отображается Broken DAG: [/usr/local/airflow/src/dags/d06-query_validations/d06-query_validations_daily.py] list index out of range.пожалуйста, помогите разобраться в чем проблема

что я пробовал до сих пор:

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 11, 25, 8, 15),
'wait_for_downstream': True,
'email': email_list,
'email_on_failure': True,
'email_on_retry': False
}


def getValidationsToRun():
    start_time = datetime.now()
    conn = MySqlHook(mysql_conn_id='mysql_main', kwargs={"charset": "utf8"})
    query = ReadTextFile('/d06-query_validations/get_validations.sql')
    logging.log(logging.INFO, "Extract Query={}".format(query))
    records = conn.get_pandas_df(query)
    logging.log(logging.INFO, "Extract completed. it took: 
    {}".format(str(datetime.now() - start_time)))
    return records


def create_subdag(parent_dag_name, child_dag_name, validation):
    inner_dag = DAG(
        %s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args.copy(),
        schedule_interval='@once'
    )
    QueryValidationFlow(
        dag=inner_dag,
        validation_name=validation.validationName,
        title=validation.messageTemplate,
        query=validation.query,
        expected_result=validation.expectedResult,
        source_db=validation.source,
        emails=validation.emailRecipients.split(',')
        )
        return inner_dag


def create_subdag_operator(parent_dag, validation):
    child_dag_name = 'subdag_{}'.format(validation.validationName)
    parent_dag_name = parent_dag.dag_id
    subdag = SubDagOperator(
        task_id=child_dag_name,
        dag=parent_dag,
        subdag=create_subdag(parent_dag_name, child_dag_name, validation)
    )
    return subdag


def create_subdag_operators(parent_dag, validations):
    subdag_list = [create_subdag_operator(parent_dag, row) for index, row in validations.iterrows()]
    # chain subdag operators together
    helpers.chain(*subdag_list)
    return subdag_list


# (top-level) DAG & operators
dag = DAG(dag_id='d06-query_validations', schedule_interval='0 * * * *', 
default_args=default_args, catchup=False)

curr_validations = getValidationsToRun()

curr_validation_ids = ",".join(["'%s'" % str(validationId) for validationId in curr_validations["validationId"]])

dummy_op_start = DummyOperator(task_id='d06-op_start', dag=dag)

subdag_ops = create_subdag_operators(dag, curr_validations)

update_execution_time = MySqlOperator(
    task_id='d06-update_execution_time',
    sql=ReadTextFile('/d06- 
    query_validations/update_validations.sql').format(curr_validation_ids),
    mysql_conn_id='mysql_main',
    retries=5,
    execution_timeout=timedelta(minutes=2),
    retry_delay=60,
    dag=dag
)

dummy_op_start >> subdag_ops[0]
subdag_ops[-1] >> update_execution_time

1 Ответ

0 голосов
/ 18 декабря 2018

К вашему сведению, все в непосредственном контексте вашего файла DAG выполняется в цикле веб-сервером воздушного потока и планировщиком воздушного потока, чтобы определить, что находится в вашей группе DAG.Это происходит даже с файлами python в папке DAG, которые не производят дагс.Это также происходит с файлами DAG, для которых у DAG нет расписания или которые были отключены в пользовательском интерфейсе или базе данных.Это происходит потому, что любой файл Python может динамически создавать новый DAG.

Так что это выполняется много:

def getValidationsToRun():
  start_time = datetime.now()
  conn = MySqlHook(mysql_conn_id='mysql_main', kwargs={"charset": "utf8"})
  query = ReadTextFile('/d06-query_validations/get_validations.sql')
  logging.log(logging.INFO, "Extract Query={}".format(query))
  records = conn.get_pandas_df(query)
  logging.log(logging.INFO, "Extract completed. it took: 
  {}".format(str(datetime.now() - start_time)))
  return records

Что, я уверен, вы бы сделалипосмотрите, проверяете ли вы журналы вашего планировщика.

Я подозреваю, что иногда результаты пусты и поэтому subdag_ops[0] выходит за пределы допустимого диапазона.

Также

sql=ReadTextFile(
    '/d06-query_validations/update_validations.sql').format(curr_validation_ids),

означает, что вы не читали об использовании шаблонных полей и параметров.Вероятно, это должно быть больше похоже на

sql='./d06- 
query_validations/update_validations.sql',
params={'val_ids': curr_validation_ids},

с файлом sql, содержащим {{ params.val_ids }} где-то там.

Может быть, Астрономические документы для шаблонов помогут более чем Воздушные ?

...