Так что я боролся с этим часами. Это код моего оператора:
task1 = GKEPodOperator(
task_id="task1",
project_id="proj",
location="location",
cluster_name="cluster_name",
name="cluster-calculator",
namespace="default",
image=Variable.get("cluster_calculator_image"),
arguments=['--name clustercalculator'],
env_vars=env_vars,
xcom_push=True,
is_delete_operator_pod=True,
get_logs=True,
dag=dag
)
Модуль запускает простой докер-контейнер с Java-приложением, выполняющим некоторые вещи и записывающим результаты по умолчанию / airflow / xcom / result.json file.
Вот как я пытаюсь получить результат xcom_push:
def print_xcom_result(*op_args, **kwargs):
print(op_args)
print(kwargs['task_instance'].xcom_pull(task_ids='task1'))
test_values = PythonOperator(
task_id="task1_test",
python_callable=print_xcom_result,
provide_context=True,
op_args=["{{task_instance.xcom_pull(task_ids='task1')}}"],
dag=dag
)
Что бы я ни пытался, оно всегда печатает None.
[2019-10-12 00:06:23,061] {{logging_mixin.py:95}} INFO - ('None',)
[2019-10-12 00:06:23,072] {{logging_mixin.py:95}} INFO - None
Когда я идук XCOM на интерфейсе Airflow ничего не показывает. Я также попробовал пример отсюда: Не удалось извлечь xcom из модуля потока воздуха - Kubernetes Pod Operator , и он тоже не работал.
Контейнер-коляска создан наверняка, и я вижу его вывод в журналах:
Running command... [1mcat /airflow/xcom/return.json[0m
Running command... [1mkill -s SIGINT 1[0m
INFO[0m - {"clusterSize":2}[0m
Я даже попытался запустить докер-контейнер извне, проверил, что результат записан в xcomкаталог правильно, но не повезло, получив этот результат во время выполнения DAG.
Последняя версия воздушного потока. Python 3.7
Если это имеет значение, у меня есть 6 контейнеров, работающих под управлением Airflow (веб-сервер, цветок, рабочий, планировщик, postgre, rabbitmq). Сельдерей является исполнителем. Блоки работают в движке Kubernetes в Google Cloud.
Ошибок нет, оба оператора успешны.
У кого-нибудь есть идеи? Заранее спасибо.