Как создать kubeconfig для рабочего модуля Kubernetes Airflow, запускающего KubernetesPodOperator - PullRequest
0 голосов
/ 24 января 2019

Я настраиваю Airflow в Kubernetes Engine, и теперь у меня есть следующие (работающие) модули:

  • postgres (со смонтированным PersistentVolumeClaim)
  • flower
  • web (панель управления воздушным потоком)
  • rabbitmq
  • планировщик
  • работник

Из Airflow я хотел бы запуститьзадача запуска модуля, который - в данном случае - загружает какой-либо файл с SFTP-сервера.Однако KubernetesPodOperator в Airflow, который должен запустить этот новый модуль, не может быть запущен, потому что kubeconfig не может быть найден.

Рабочий Airflow настроен, как показано ниже.Другие модули Airflow точно такие же, за исключением args.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: worker
    spec:
      restartPolicy: Always
      containers:
        - name: worker
          image: my-gcp-project/kubernetes-airflow-in-container-registry:v1
          imagePullPolicy: IfNotPresent
          env:
            - name: AIRFLOW_HOME
              value: "/usr/local/airflow"
          args: ["worker"]

. KubernetesPodOperator настроен следующим образом:

maybe_download = KubernetesPodOperator(
    task_id='maybe_download_from_sftp',
    image='some/image:v1',
    namespace='default',
    name='maybe-download-from-sftp',
    arguments=['sftp_download'],
    image_pull_policy='IfNotPresent',
    dag=dag,
    trigger_rule='dummy',
)

Следующая ошибка показывает, что kubeconfig не включенpod.

[2019-01-24 12:37:04,706] {models.py:1789} INFO - All retries failed; marking task as FAILED
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp Traceback (most recent call last):
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/bin/airflow", line 32, in <module>
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     args.func(args)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return f(*args, **kwargs)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 490, in run
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     _run(args, dag, ti)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 406, in _run
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     pool=args.pool,
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return func(*args, **kwargs)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     result = task_copy.execute(context=context)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 90, in execute
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_file=self.config_file)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 51, in get_kube_client
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return _load_kube_config(in_cluster, cluster_context, config_file)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 38, in _load_kube_config
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config.load_kube_config(config_file=config_file, context=cluster_context)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 537, inload_kube_config
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_persister=config_persister)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 494, in_get_kube_config_loader_for_yaml_file
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     with open(filename) as f:
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/.kube/config'
[2019-01-24 12:37:08,300] {logging_mixin.py:95} INFO - [2019-01-24 12:37:08,299] {jobs.py:2627} INFO - Task exited with return code 1

Я бы хотел, чтобы модуль запускался и "автоматически" содержал контекст кластера Kubernetes, в котором он находится - если это имеет смысл.Я чувствую, что упускаю что-то фундаментальное.Может ли кто-нибудь помочь?

1 Ответ

0 голосов
/ 26 января 2019

Как описано в Руководстве по тонкой настройке , вы хотите, чтобы in_cluster=True уведомил КПО о том, что он фактически находится в кластере.

Я бы действительно рекомендовал регистрировать ошибкус Airflow, потому что Airflow может тривиально обнаружить тот факт, что он работает внутри кластера, и должен иметь гораздо более вменяемое значение по умолчанию, чем ваш опыт.

...