Воздушный поток KubernetesPodOperator в GKE / GCP не запускает пользовательские модули - PullRequest
0 голосов
/ 03 июня 2019

Мы запускаем Airflow 1.10.2 с KubernetesExecutor с автономным управлением в кластере GKE в GCP. Пока что все внутренние операторы работают нормально, кроме KubernetesPodOperator, который мы хотели бы использовать для запуска наших пользовательских образов Docker. Кажется, что рабочие образы Airflow не имеют прав для запуска других модулей в кластере Kubernetes. Похоже, что DAG ничего не делает после запуска. Это то, что мы нашли в логах изначально:

FileNotFoundError: [Errno 2] No such file or directory: '/root/.kube/config'

Следующая попытка - in_cluster=True параметр в разделе KubernetesPodOperator, похоже, не помогает. После этого мы попытались использовать этот параметр в airflow.cfg, раздел [kubernetes]:

gcp_service_account_keys = kubernetes-executor-private-key:/var/tmp/private/kubernetes_executor_private_key.json

и сообщение об ошибке теперь TypeError: a bytes-like object is required, not 'str' Это определение параметра из github:

# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =

Уже пытались использовать различные виды скобок и кавычек, но безуспешно.

код DAG:

from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'xxx',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          in_cluster=True,
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          in_cluster=True,
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

Кто-нибудь сталкивался с такой же проблемой? Я что-то здесь упускаю?

...