Мое требование - выполнять задачи на основе параметров, указанных в файле конфигурации, и в настоящее время я планирую добиться этого с помощью двух групп обеспечения доступности баз данных - scheduler.py
и program.py
. Вот пример файла конфигурации.
[Task 1]
start_date : 2020-04-19 06:30
end_date : 2020-04-28 20:30
day : TUE
timezone : UTC
[Task 2]
start_date : 2020-04-26 06:30
end_date : 2020-07-28 06:30
day : MON
timezone : UTC
[Task 3]
start_date : 2020-04-26 06:00
end_date : 2020-07-28 06:30
day : MON
timezone : UTC
Планируется, что группа обеспечения доступности баз данных scheduler
будет запускаться каждый час, и она проверяет допустимые задачи, которые должны выполняться между execute_date и next_execution_date. Например, запуск DAG планировщика на 2020-04-26T06: 00: 00 + 00: 00 будет считать Задачи - Задача 2 и Задача 3 действительными. Моя идея - передать эти значения в качестве входных данных программе DAG и вызвать необходимые операторы.
Для достижения этой цели мой текущий подход заключается в использовании PythonOperator для перебора всех записей в файле конфигурации и поиска допустимых задач для выполнения в текущем цикле выполнения. Создается значение XCom
для DAG, Task и Execution_Date.
Теперь я хочу создать динамические задачи c TriggerDagRunOperator
s на основе записей в значении XCom, что я не могу.
Ниже мой scheduler
DAG и program
DAG.
планировщик DAG:
def process_config(**kwargs):
print("Date Time Now : {}".format(datetime.now(timezone('GMT'))))
print(default_args['unique_dag_run_identifer'])
current_run = kwargs.get('execution_date', datetime.now(timezone('GMT')))
next_run = current_run + timedelta(minutes=59)
config_file = '{}/chatter_program.config'.format(etl_root_directory)
config = cp.ConfigParser()
config.read(config_file, encoding='utf-8-sig')
programs = config.sections()
print("Available Programs {}".format(programs))
valid_programs = []
print("Current run (rounded off) : {}, Next run : {}.".format(current_run, next_run))
for program in programs:
print("Analysing program {}".format(program))
program_timezone = timezone(config.get(program, 'timezone'))
day = config.get(program, 'day')
start_date = (datetime.strptime(config.get(program, 'start_date'), '%Y-%m-%d %H:%M'))
start_date = program_timezone.localize(start_date).astimezone(timezone('GMT'))
end_date = (datetime.strptime(config.get(program, 'end_date'), '%Y-%m-%d %H:%M'))
end_date = program_timezone.localize(end_date).astimezone(timezone('GMT'))
print("Program Start Date : {}, End Date : {}, Execution Day : {}".format(start_date, end_date, day))
time_range = pandas.date_range(start_date, end_date, freq='D')
time_range = [x.to_pydatetime() for x in time_range]
can_run_this_hour = False
for time in time_range:
if current_run <= time <= next_run:
can_run_this_hour = True
break
if can_run_this_hour and start_date.weekday() == current_run.weekday():
valid_program = {'program': program,
'start_date': start_date,
'end_date': end_date,
'wait_time': time - current_run}
print("Program {} is valid. Details : {}".format(program, valid_program))
valid_programs.append(valid_program)
print("Valid programs that can run now : {}".format(valid_programs))
return valid_programs
execution_date = '{{ execution_date }}'
get_programs = PythonOperator(task_id='get_programs', python_callable=process_config, provide_context=True, dag=dag)
программа DAG (фиктивный код на данный момент):
def run_this_func(**context):
print("Remotely received value of {} for key=message".format(context))
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
I Пробовал следующие варианты, но ничего не помогло:
Доступ к значению XCom, итерация и добавление динамических c задач. Примерно так:
def python_call2(trigger_dags, **kwargs):
print(trigger_dags)
for trigger_dag in trigger_dags:
program_name = trigger_dag['program'].replace(' ', '_')
print(trigger_dag)
trigger_dag = TriggerDagRunOperator(task_id="trigger_dag_{}".format(program_name),
trigger_dag_id="data_com_chatter_program",
dag=dag
)
get_programs.set_downstream(trigger_dag)
Но это не работает, поскольку Airflow не позволяет создавать задачи внутри метода PythonOperator.
Доступ к значению XCom в группе обеспечения доступности баз данных, но не в рамках какой-либо задачи, что-то вроде этого:
valid_programs = XCom.get_one(execution_date=execution_date,
task_id='get_programs',
dag_id='scheduler',
include_prior_dates=True)
Но я невозможно получить значение переменной execute_date.
Я не могу использовать значение XCom без execution_date
, поскольку моя программа DAG может работать дольше, что приведет к запуску следующего цикл. Это может обновить значения XCom. Итак, я должен получить значение XCom с этой даты исполнения, в этом случае 2020-04-26T06:00:00+00:00
Может кто-нибудь подсказать, как мне этого добиться или, если мой подход не верен, предложите правильный подход.