Не удалось извлечь xcom из модуля воздушных потоков - Kubernetes Pod Operator - PullRequest
0 голосов
/ 27 ноября 2018

Во время работы группы обеспечения доступности баз данных, которая запускает флягу с использованием образа докера, указывается
xcom_push = True , который создает другой контейнер вместе с образом докера в одном модуле.

DAG:

jar_task = KubernetesPodOperator(
    namespace='test',
    image="path to image",
    image_pull_secrets="secret",
    image_pull_policy="Always",
    node_selectors={"d-type":"na-node-group"},
    cmds=["sh","-c",..~running jar here~..],
    secrets=[secret_file],
    env_vars=environment_vars,
    labels={"k8s-app": "airflow"},
    name="airflow-pod",
    config_file=k8s_config_file,
    resources=pod.Resources(request_cpu=0.2,limit_cpu=0.5,request_memory='512Mi',limit_memory='1536Mi'),
    in_cluster=False,
    task_id="run_jar",
    is_delete_operator_pod=True,
    get_logs=True,
    xcom_push=True,
    dag=dag)

Вот ошибки при успешном выполнении JAR ..

    [2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,605] {{pod_launcher.py:166}} INFO - Running command... cat /airflow/xcom/return.json
    [2018-11-27 11:37:21,605] {{logging_mixin.py:95}} INFO - 
    [2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,646] {{pod_launcher.py:173}} INFO - cat: can't open '/airflow/xcom/return.json': No such file or directory
    [2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - 
    [2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - [2018-11-27 11:37:21,647] {{pod_launcher.py:166}} INFO - Running command... kill -s SIGINT 1
    [2018-11-27 11:37:21,647] {{logging_mixin.py:95}} INFO - 
    [2018-11-27 11:37:21,702] {{models.py:1760}} ERROR - Pod Launching failed: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12
    Traceback (most recent call last):
      File "/usr/local/airflow/operators/kubernetes_pod_operator.py", line 126, in execute
        get_logs=self.get_logs)
      File "/usr/local/airflow/operators/pod_launcher.py", line 90, in run_pod
        return self._monitor_pod(pod, get_logs)
      File "/usr/local/airflow/operators/pod_launcher.py", line 110, in _monitor_pod
        result = self._extract_xcom(pod)
      File "/usr/local/airflow/operators/pod_launcher.py", line 161, in _extract_xcom
        raise AirflowException('Failed to extract xcom from pod: {}'.format(pod.name))
    airflow.exceptions.AirflowException: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/operators/kubernetes_pod_operator.py", line 138, in execute
        raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
    airflow.exceptions.AirflowException: Pod Launching failed: Failed to extract xcom from pod: airflow-pod-hippogriff-a4628b12
    [2018-11-27 11:37:21,704] {{models.py:1789}} INFO - All retries failed; marking task as FAILED

Ответы [ 3 ]

0 голосов
/ 01 декабря 2018

Если xcom_push - Истина, тогда KubernetesPodOperator создает еще один контейнер с коляской (airflow-xcom-sidecar) в Pod вместе с базовым контейнером (фактическим рабочим контейнером).Этот контейнер с коляской читает данные из /airflow/xcom/return.json и возвращает значение xcom.Поэтому в ваш базовый контейнер вам нужно записать данные, которые вы хотите вернуть, в файл /airflow/xcom/return.json.

0 голосов
/ 01 августа 2019

Я хочу указать на ошибку, с которой я столкнулся в отношении xcom и KubernetesPodOperator, хотя это не было той же причиной, что и OP.На всякий случай, если кто-нибудь наткнется на этот вопрос, так как это единственный вопрос, касающийся КПО и XCom.

Я использую Google Cloud Platform (GCP) Cloud Composer , он использует немного более старую версию, чем последняя версия Airflow, поэтому, когда я ссылаюсь на официальный GitHub, он упоминает об использовании do_xcom_push, тогда как старый Airflow использует вместо arg xcom_push!

0 голосов
/ 29 ноября 2018

Это произошло из-за того, что результат выполнения задачи не был передан в xcom по ожидаемому пути, требуемому плагином KubernetesPodOperator.Взгляните на следующий модульный тест из репозитория Airflow, чтобы проверить, как он должен быть реализован (для вашего удобства ниже приведен фрагмент исходного кода, а затем ссылка на репозиторий):

    def test_xcom_push(self):
        return_value = '{"foo": "bar"\n, "buzz": 2}'
        k = KubernetesPodOperator(
            namespace='default',
            image="ubuntu:16.04",
            cmds=["bash", "-cx"],
            arguments=['echo \'{}\' > /airflow/xcom/return.json'.format(return_value)],
            labels={"foo": "bar"},
            name="test",
            task_id="task",
            xcom_push=True
        )
        self.assertEqual(k.execute(None), json.loads(return_value))

https://github.com/apache/incubator-airflow/blob/36f3bfb0619cc78698280f6ec3bc985f84e58343/tests/contrib/minikube/test_kubernetes_pod_operator.py#L321

edit : стоит упомянуть, что результат, отправленный в xcom, должен быть json.

...