При попытке запустить dag с KubernetesExecutor
получение исключения в рабочем модуле завершается сразу после запуска:
У меня вопрос, почему планировщик, отправляющий LocalExecutor как переменную env, которая может быть найдена в pod describe result
, это правильное поведение?
Найдите все необходимые файлы:
- airflow.cfg
- описание рабочего файла
- журналов рабочего файла
- файл dag
Worker pod описывает результат:
Name: tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac
Namespace: default
Priority: 0
Node: worker01/<node-ip>
Start Time: <date-time>
Labels: airflow-worker=<airflow-dummy>
airflow_version=1.10.11
dag_id=tutorial_v01
execution_date=
kubernetes_executor=True
task_id=print_hello
try_number=1
Annotations: <none>
Status: Failed
IP: <Node Ip>
IPs:
IP: <Node Ip>
Containers:
base:
Container ID: <container-id>
Image: <repo-name>/k8-airflow:latest
Image ID: docker-pullable://<repo-name>/k8-
Port: <none>
Host Port: <none>
Command:
airflow
run
tutorial_v01
print_hello
<date time>
--local
--pool
default_pool
-sd
/usr/local/airflow/dags/tutorial_01.py
State: Terminated
Reason: Error
Exit Code: 1
Started: Thu, 06 Aug 2020 13:20:21 +0000
Finished: Thu, 06 Aug 2020 13:20:22 +0000
Ready: False
Restart Count: 0
Environment Variables from:
airflow-configmap ConfigMap Optional: false
Environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__DAGS_FOLDER: /usr/local/airflow/dags/repo/
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <alchemy-postgres-conn-url>
Mounts:
/usr/local/airflow/dags from airflow-dags (ro)
/usr/local/airflow/logs from airflow-logs (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-sdfdfdd (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
airflow-dags:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-dags
ReadOnly: false
airflow-logs:
Type: EmptyDir (a temporary directory that shares a pod's lifetime)
Medium:
SizeLimit: <unset>
default-token-mnh2t:
Type: Secret (a volume populated by a Secret)
SecretName: default-token-mnh2t
Optional: false
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> default-scheduler Successfully assigned default/tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac to worker01
Normal Pulling 8m4s kubelet, worker01 Pulling image "<repo-name>/k8-airflow:latest"
Normal Pulled 8m1s kubelet, worker01 Successfully pulled image "<repo-name>/k8-airflow:latest"
Normal Created 8m1s kubelet, worker01 Created container base
Normal Started 8m1s kubelet, worker01 Started container base
Журналы Worker pod:
File "/usr/bin/airflow", line 25, in <module>
from airflow.configuration import conf
File "/usr/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
from airflow.utils.log.logging_mixin import LoggingMixin
File "/usr/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
from .decorators import apply_defaults as _apply_defaults
File "/usr/lib/python3.6/site-packages/airflow/utils/decorators.py", line 36, in <module>
from airflow import settings
File "/usr/lib/python3.6/site-packages/airflow/settings.py", line 37, in <module>
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
File "/usr/lib/python3.6/site-packages/airflow/configuration.py", line 636, in <module>
with open(TEST_CONFIG_FILE, 'w') as f:
PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/unittests.cfg'
найти airflow.cfg:
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
labels:
env: airflow-test
data:
airflow.cfg: |
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
logging_level = INFO
executor = KubernetesExecutor
parallelism = 32
load_examples = False
plugins_folder = /usr/local/airflow/plugins
sql_alchemy_conn = postgresql+psycopg2://<username>:<pwd>@airflow-metastore:5432/airflow
[celery]
broker_url =
result_backend =
[webserver]
base_url = http://0.0.0.0:8080
rbac=False
web_server_host = 0.0.0.0
web_server_port = 8080
dag_default_view = tree
[kubernetes]
namespace = default
airflow_configmap =
worker_service_account_name = default
worker_container_image_pull_policy = Always
worker_dags_folder = /usr/local/airflow/dags
worker_container_repository = <repo-name>/k8-airflow
worker_container_tag = latest
delete_worker_pods = false
env_from_configmap_ref = airflow-configmap
git_repo = https://github.com/<repo-name>/airflow-dags
git_branch = master
git_sync_credentials_secret = git-credentials
git_sync_root = /tmp/git
git_dags_folder_mount_point = /usr/local/airflow/dags
git_sync_container_repository = <repo-name>/git-sync
git_sync_container_tag = latest
git_sync_init_container_name = git-sync-clone
dags_volume_claim = airflow-dags
in_cluster = True
dags_volume_subpath =
dags_volume_mount_point =
[kubernetes_environment_variables]
AIRFLOW__CORE__EXECUTOR = KubernetesExecutor
AIRFLOW__CORE__DAGS_FOLDER = /usr/local/airflow/dags
[admin]
hide_sensitive_variable_fields = True
И файл Kubernetes:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: default
namespace: default
labels:
env: airflow-test
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: default
namespace: default
labels:
env: airflow-test
subjects:
- kind: ServiceAccount
name: default # Name of the ServiceAccount
namespace: default
roleRef:
kind: Role # This must be Role or ClusterRole
name: default # This must match the name of the Role or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
replicas: 1
selector:
matchLabels:
env: airflow-test
template:
metadata:
labels:
env: airflow-test
spec:
initContainers:
- name: "init"
image: <repo-name>/k8-airflow
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /usr/local/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /usr/local/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "initdb.sh"
containers:
- name: webserver
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "webserver"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
- name: scheduler
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "scheduler"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
volumes:
- name: airflow-configmap
configMap:
name: airflow-configmap
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
type: NodePort
ports:
- name: webserver
protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30003
selector:
env: airflow-test
Dag-файл:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def print_world():
print('world_1')
default_args = {
'start_date': dt.datetime(2020, 8, 6,9,45,0),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('tutorial_v01',
default_args=default_args,
schedule_interval='*/30 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"')
sleep = BashOperator(task_id='sleep',
bash_command='sleep 5')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
print_hello >> sleep >> print_world
Я уже предоставил 777 в это место /usr/local/airflow/
в используемом образе, пожалуйста, дайте мне знать если требуется еще что-нибудь