Airflow kubernetesExecutor: Worker pod завершает работу после создания - PullRequest
2 голосов
/ 06 августа 2020

При попытке запустить dag с KubernetesExecutor получение исключения в рабочем модуле завершается сразу после запуска:

У меня вопрос, почему планировщик, отправляющий LocalExecutor как переменную env, которая может быть найдена в pod describe result, это правильное поведение?

Найдите все необходимые файлы:

  1. airflow.cfg
  2. описание рабочего файла
  3. журналов рабочего файла
  4. файл dag

Worker pod описывает результат:

Name:         tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac
Namespace:    default
Priority:     0
Node:         worker01/<node-ip>
Start Time:   <date-time>
Labels:       airflow-worker=<airflow-dummy>
              airflow_version=1.10.11
              dag_id=tutorial_v01
              execution_date=
              kubernetes_executor=True
              task_id=print_hello
              try_number=1
Annotations:  <none>
Status:       Failed
IP:           <Node Ip>
IPs:
  IP:  <Node Ip>
Containers:
  base:
    Container ID:  <container-id>
    Image:         <repo-name>/k8-airflow:latest
    Image ID:      docker-pullable://<repo-name>/k8-
    Port:          <none>
    Host Port:     <none>
    Command:
      airflow
      run
      tutorial_v01
      print_hello
      <date time>
      --local
      --pool
      default_pool
      -sd
      /usr/local/airflow/dags/tutorial_01.py
    State:          Terminated
      Reason:       Error
      Exit Code:    1
      Started:      Thu, 06 Aug 2020 13:20:21 +0000
      Finished:     Thu, 06 Aug 2020 13:20:22 +0000
    Ready:          False
    Restart Count:  0
    Environment Variables from:
      airflow-configmap  ConfigMap  Optional: false
    Environment:
      AIRFLOW__CORE__EXECUTOR:          LocalExecutor
      AIRFLOW__CORE__DAGS_FOLDER:       /usr/local/airflow/dags/repo/
      AIRFLOW__CORE__SQL_ALCHEMY_CONN:  <alchemy-postgres-conn-url>
    Mounts:
      /usr/local/airflow/dags from airflow-dags (ro)
      /usr/local/airflow/logs from airflow-logs (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-sdfdfdd (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  airflow-dags:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-dags
    ReadOnly:   false
  airflow-logs:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
    SizeLimit:  <unset>
  default-token-mnh2t:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-mnh2t
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason     Age        From               Message
  ----    ------     ----       ----               -------
  Normal  Scheduled  <unknown>  default-scheduler  Successfully assigned default/tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac to worker01
  Normal  Pulling    8m4s       kubelet, worker01  Pulling image "<repo-name>/k8-airflow:latest"
  Normal  Pulled     8m1s       kubelet, worker01  Successfully pulled image "<repo-name>/k8-airflow:latest"
  Normal  Created    8m1s       kubelet, worker01  Created container base
  Normal  Started    8m1s       kubelet, worker01  Started container base

Журналы Worker pod:

  File "/usr/bin/airflow", line 25, in <module>
    from airflow.configuration import conf
  File "/usr/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
    from airflow.utils.log.logging_mixin import LoggingMixin
  File "/usr/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
    from .decorators import apply_defaults as _apply_defaults
  File "/usr/lib/python3.6/site-packages/airflow/utils/decorators.py", line 36, in <module>
    from airflow import settings
  File "/usr/lib/python3.6/site-packages/airflow/settings.py", line 37, in <module>
    from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG  # NOQA F401
  File "/usr/lib/python3.6/site-packages/airflow/configuration.py", line 636, in <module>
    with open(TEST_CONFIG_FILE, 'w') as f:
PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/unittests.cfg'

найти airflow.cfg:

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-configmap
  labels:
    env: airflow-test
data:
  airflow.cfg: |
    [core]
    dags_folder = /usr/local/airflow/dags
    base_log_folder = /usr/local/airflow/logs
    logging_level = INFO
    executor = KubernetesExecutor
    parallelism = 32
    load_examples = False
    plugins_folder = /usr/local/airflow/plugins
    sql_alchemy_conn = postgresql+psycopg2://<username>:<pwd>@airflow-metastore:5432/airflow

    [celery]
    broker_url =
    result_backend =

    [webserver]
    base_url = http://0.0.0.0:8080
    rbac=False
    web_server_host = 0.0.0.0
    web_server_port = 8080
    dag_default_view = tree

    [kubernetes]
    namespace = default
    airflow_configmap =
    worker_service_account_name = default
    worker_container_image_pull_policy = Always
    worker_dags_folder = /usr/local/airflow/dags
    worker_container_repository = <repo-name>/k8-airflow
    worker_container_tag = latest
    delete_worker_pods = false
    env_from_configmap_ref = airflow-configmap
    git_repo = https://github.com/<repo-name>/airflow-dags
    git_branch = master
    git_sync_credentials_secret = git-credentials
    git_sync_root = /tmp/git
    git_dags_folder_mount_point = /usr/local/airflow/dags
    git_sync_container_repository = <repo-name>/git-sync
    git_sync_container_tag = latest
    git_sync_init_container_name = git-sync-clone
    dags_volume_claim = airflow-dags
    in_cluster = True
    dags_volume_subpath =
    dags_volume_mount_point =


    [kubernetes_environment_variables]
    AIRFLOW__CORE__EXECUTOR = KubernetesExecutor
    AIRFLOW__CORE__DAGS_FOLDER = /usr/local/airflow/dags


    [admin]
    hide_sensitive_variable_fields = True

И файл Kubernetes:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: default
  namespace: default
  labels:
    env: airflow-test
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: default
  namespace: default
  labels:
    env: airflow-test
subjects:
  - kind: ServiceAccount
    name: default # Name of the ServiceAccount
    namespace: default
roleRef:
  kind: Role # This must be Role or ClusterRole
  name: default # This must match the name of the Role or ClusterRole you wish to bind to
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: default
  labels:
    env: airflow-test
spec:
  replicas: 1
  selector:
    matchLabels:
      env: airflow-test
  template:
    metadata:
      labels:
        env: airflow-test
    spec:
      initContainers:
        - name: "init"
          image: <repo-name>/k8-airflow
          imagePullPolicy: Always
          volumeMounts:
            - name: airflow-configmap
              mountPath: /usr/local/airflow/airflow.cfg
              subPath: airflow.cfg
            - name: airflow-dags
              mountPath: /usr/local/airflow/dags
          env:
            - name: SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-secrets
                  key: sql_alchemy_conn
          command:
            - "bash"
          args:
            - "-cx"
            - "initdb.sh"

      containers:
        - name: webserver
          image: <repo-name>/k8-airflow
          imagePullPolicy: IfNotPresent
          env:
            - name: NODE
              value: "webserver"
          envFrom:
            - configMapRef:
                name: airflow-configmap
          ports:
            - name: webserver
              protocol: TCP
              containerPort: 8080
          volumeMounts:
            - mountPath: /usr/local/airflow/dags
              name: airflow-dags
            - mountPath: /usr/local/airflow/airflow.cfg
              name: airflow-configmap
              subPath: airflow.cfg
            - name: airflow-logs
              mountPath: /usr/local/airflow/logs
        - name: scheduler
          image: <repo-name>/k8-airflow
          imagePullPolicy: IfNotPresent
          env:
            - name: NODE
              value: "scheduler"
          envFrom:
            - configMapRef:
                name: airflow-configmap
          ports:
            - name: webserver
              protocol: TCP
              containerPort: 8080
          volumeMounts:
            - mountPath: /usr/local/airflow/dags
              name: airflow-dags
            - mountPath: /usr/local/airflow/airflow.cfg
              name: airflow-configmap
              subPath: airflow.cfg
            - name: airflow-logs
              mountPath: /usr/local/airflow/logs
      volumes:
        - name: airflow-configmap
          configMap:
            name: airflow-configmap
        - name: airflow-dags
          persistentVolumeClaim:
            claimName: airflow-dags
        - name: airflow-logs
          persistentVolumeClaim:
            claimName: airflow-logs

---

apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: default
  labels:
    env: airflow-test
spec:
  type: NodePort
  ports:
    - name: webserver
      protocol: TCP
      port: 8080
      targetPort: 8080
      nodePort: 30003
  selector:
    env: airflow-test

Dag-файл:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def print_world():
    print('world_1')

default_args = {
    'start_date': dt.datetime(2020, 8, 6,9,45,0),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

with DAG('tutorial_v01',
         default_args=default_args,
         schedule_interval='*/30 * * * *',
         ) as dag:
    print_hello = BashOperator(task_id='print_hello',
                               bash_command='echo "hello"')
    sleep = BashOperator(task_id='sleep',
                         bash_command='sleep 5')
    print_world = PythonOperator(task_id='print_world',
                                 python_callable=print_world)

print_hello >> sleep >> print_world 

Я уже предоставил 777 в это место /usr/local/airflow/ в используемом образе, пожалуйста, дайте мне знать если требуется еще что-нибудь

1 Ответ

1 голос
/ 08 августа 2020

K8s запускает воздушный поток как docker контейнер. Когда вы раскручиваете контейнер, вам нужно запустить его как пользователя воздушного потока.

Это может быть достигнуто в вашем dockerfile. Вы можете указать ему запускаться от имени пользователя. Пожалуйста, дайте мне, если вы хотите узнать больше об этом.

Также по вашей проблеме, указанной выше. Пожалуйста, обратитесь к этому.

https://issues.apache.org/jira/browse/AIRFLOW-6754

Надеюсь, это ответит на ваши вопросы. Дай мне знать.

...