Воздушный поток GKEPodOperator xcom_push возвращает None - PullRequest
0 голосов
/ 12 октября 2019

Так что я боролся с этим часами. Это код моего оператора:

task1 = GKEPodOperator(
        task_id="task1",
        project_id="proj",
        location="location",
        cluster_name="cluster_name",
        name="cluster-calculator",
        namespace="default",
        image=Variable.get("cluster_calculator_image"),
        arguments=['--name clustercalculator'],
        env_vars=env_vars,
        xcom_push=True,
        is_delete_operator_pod=True,
        get_logs=True,
        dag=dag
    )

Модуль запускает простой докер-контейнер с Java-приложением, выполняющим некоторые вещи и записывающим результаты по умолчанию / airflow / xcom / result.json file.

Вот как я пытаюсь получить результат xcom_push:

def print_xcom_result(*op_args, **kwargs):
        print(op_args)
        print(kwargs['task_instance'].xcom_pull(task_ids='task1'))

    test_values = PythonOperator(
        task_id="task1_test",
        python_callable=print_xcom_result,
        provide_context=True,
        op_args=["{{task_instance.xcom_pull(task_ids='task1')}}"],
        dag=dag
    )

Что бы я ни пытался, оно всегда печатает None.

[2019-10-12 00:06:23,061] {{logging_mixin.py:95}} INFO - ('None',)
[2019-10-12 00:06:23,072] {{logging_mixin.py:95}} INFO - None

Когда я идук XCOM на интерфейсе Airflow ничего не показывает. Я также попробовал пример отсюда: Не удалось извлечь xcom из модуля потока воздуха - Kubernetes Pod Operator , и он тоже не работал.

Контейнер-коляска создан наверняка, и я вижу его вывод в журналах:

Running command... [1mcat /airflow/xcom/return.json[0m
Running command... [1mkill -s SIGINT 1[0m
INFO[0m - {"clusterSize":2}[0m

Я даже попытался запустить докер-контейнер извне, проверил, что результат записан в xcomкаталог правильно, но не повезло, получив этот результат во время выполнения DAG.

Последняя версия воздушного потока. Python 3.7

Если это имеет значение, у меня есть 6 контейнеров, работающих под управлением Airflow (веб-сервер, цветок, рабочий, планировщик, postgre, rabbitmq). Сельдерей является исполнителем. Блоки работают в движке Kubernetes в Google Cloud.

Ошибок нет, оба оператора успешны.

У кого-нибудь есть идеи? Заранее спасибо.

...