Передать значения между группами DAG в AIRFLOW - PullRequest
0 голосов
/ 26 апреля 2020

Мое требование - выполнять задачи на основе параметров, указанных в файле конфигурации, и в настоящее время я планирую добиться этого с помощью двух групп обеспечения доступности баз данных - scheduler.py и program.py. Вот пример файла конфигурации.

[Task 1]
start_date : 2020-04-19 06:30
end_date : 2020-04-28 20:30
day : TUE
timezone : UTC

[Task 2]
start_date : 2020-04-26 06:30
end_date : 2020-07-28 06:30
day : MON
timezone : UTC

[Task 3]
start_date : 2020-04-26 06:00
end_date : 2020-07-28 06:30
day : MON
timezone : UTC

Планируется, что группа обеспечения доступности баз данных scheduler будет запускаться каждый час, и она проверяет допустимые задачи, которые должны выполняться между execute_date и next_execution_date. Например, запуск DAG планировщика на 2020-04-26T06: 00: 00 + 00: 00 будет считать Задачи - Задача 2 и Задача 3 действительными. Моя идея - передать эти значения в качестве входных данных программе DAG и вызвать необходимые операторы.

Для достижения этой цели мой текущий подход заключается в использовании PythonOperator для перебора всех записей в файле конфигурации и поиска допустимых задач для выполнения в текущем цикле выполнения. Создается значение XCom для DAG, Task и Execution_Date.

Теперь я хочу создать динамические задачи c TriggerDagRunOperator s на основе записей в значении XCom, что я не могу.

Ниже мой scheduler DAG и program DAG.

планировщик DAG:

def process_config(**kwargs):
    print("Date Time Now : {}".format(datetime.now(timezone('GMT'))))
    print(default_args['unique_dag_run_identifer'])
    current_run = kwargs.get('execution_date', datetime.now(timezone('GMT')))
    next_run = current_run + timedelta(minutes=59)

    config_file = '{}/chatter_program.config'.format(etl_root_directory)
    config = cp.ConfigParser()
    config.read(config_file, encoding='utf-8-sig')
    programs = config.sections()
    print("Available Programs {}".format(programs))
    valid_programs = []
    print("Current run (rounded off) : {}, Next run : {}.".format(current_run, next_run))
    for program in programs:
        print("Analysing program {}".format(program))
        program_timezone = timezone(config.get(program, 'timezone'))
        day = config.get(program, 'day')

        start_date = (datetime.strptime(config.get(program, 'start_date'), '%Y-%m-%d %H:%M'))
        start_date = program_timezone.localize(start_date).astimezone(timezone('GMT'))

        end_date = (datetime.strptime(config.get(program, 'end_date'), '%Y-%m-%d %H:%M'))
        end_date = program_timezone.localize(end_date).astimezone(timezone('GMT'))
        print("Program Start Date : {}, End Date : {}, Execution Day : {}".format(start_date, end_date, day))

        time_range = pandas.date_range(start_date, end_date, freq='D')
        time_range = [x.to_pydatetime() for x in time_range]
        can_run_this_hour = False
        for time in time_range:
            if current_run <= time <= next_run:
                can_run_this_hour = True
                break
        if can_run_this_hour and start_date.weekday() == current_run.weekday():
            valid_program = {'program': program,
                             'start_date': start_date,
                             'end_date': end_date,
                             'wait_time': time - current_run}
            print("Program {} is valid. Details : {}".format(program, valid_program))
            valid_programs.append(valid_program)
    print("Valid programs that can run now : {}".format(valid_programs))
    return valid_programs


execution_date = '{{ execution_date }}'
get_programs = PythonOperator(task_id='get_programs', python_callable=process_config, provide_context=True, dag=dag)

программа DAG (фиктивный код на данный момент):

def run_this_func(**context):
    print("Remotely received value of {} for key=message".format(context))


run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)

I Пробовал следующие варианты, но ничего не помогло:

Доступ к значению XCom, итерация и добавление динамических c задач. Примерно так:

def python_call2(trigger_dags, **kwargs):
    print(trigger_dags)
    for trigger_dag in trigger_dags:
        program_name = trigger_dag['program'].replace(' ', '_')
        print(trigger_dag)
        trigger_dag = TriggerDagRunOperator(task_id="trigger_dag_{}".format(program_name),
                                            trigger_dag_id="data_com_chatter_program",
                                            dag=dag
                                            )
        get_programs.set_downstream(trigger_dag)

Но это не работает, поскольку Airflow не позволяет создавать задачи внутри метода PythonOperator.

Доступ к значению XCom в группе обеспечения доступности баз данных, но не в рамках какой-либо задачи, что-то вроде этого:

valid_programs = XCom.get_one(execution_date=execution_date,
                                    task_id='get_programs',
                                    dag_id='scheduler',
                                    include_prior_dates=True)

Но я невозможно получить значение переменной execute_date.

Я не могу использовать значение XCom без execution_date, поскольку моя программа DAG может работать дольше, что приведет к запуску следующего цикл. Это может обновить значения XCom. Итак, я должен получить значение XCom с этой даты исполнения, в этом случае 2020-04-26T06:00:00+00:00

Может кто-нибудь подсказать, как мне этого добиться или, если мой подход не верен, предложите правильный подход.

...