Я прочитал это Как использовать XCMS с воздушным потоком с MySqlOperator , и хотя он имеет похожее название, он не решает мою проблему.
У меня есть следующий код:
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
MySqlOperator
возвращает число в соответствии с номером, который BranchPythonOperator
выбирает следующую задачу.Гарантируется, что MySqlOperator
вернул значение больше 0
.
Моя проблема в том, что ничто не подталкивается к XCOM
в пользовательском интерфейсе MySqlOperator
при переходе на XCOM
Я вижуничего такого.BranchPythonOperator
очевидно ничего не читает, поэтому мой код не работает.
Почему XCOM
здесь не работает?