Я пытаюсь перенести развертывание воздушного потока, работающего в kubernetes, с CeleryExecutor
на KubernetesExecutor
. Все прошло гладко в моей локальной среде разработки (работающей на мини-кубе), однако мне нужно загрузить контейнер с коляской в производстве, чтобы запустить прокси, который позволяет мне подключаться к моей базе данных sql. После некоторого поиска в Google кажется, что определение функции pod_mutation_hook в файле airflow_local_settings.py
где-то в $PYTHONPATH
- это то, как нужно выполнить sh.
Сначала я попытался определить это в конфигурации карты для это пример. например,
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: dev
data:
...
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM: "airflow-logs"
AIRFLOW__KUBERNETES__AIRFLOW_LOCAL_SETTINGS_CONFIGMAP: "airflow-config"
...
airflow_local_settings.py: |
from airflow.contrib.kubernetes.pod import Pod
def pod_mutation_hook(pod: Pod):
extra_labels = {
"test-label": "True",
}
pod.labels.update(extra_labels)
Я указал этот config-файл в файле airflow.cfg
, и он отлично подобрался и смонтирован, все остальные переменные env работают правильно, но pod_mutation_hook
не работает, поскольку нет меток добавляются в полученный модуль, запущенный исполнителем kubernetes (обратите внимание, что здесь также указана заявка на объем журналов, и она работает правильно).
Далее я попытался определить файл airflow_local_settings.py
на изображении, который запускается потоком воздуха для задания под $AIRFLOW_HOME/configs/airflow_local_settings.py
, как предлагается в комментарии здесь . Я также удалил соответствующие разделы из конфигурации airflow-config
выше. Это также, похоже, не повлияло на полученный модуль, созданный для задания, так как в нем также отсутствовали указанные метки.
Итак, я не уверен, как действовать в этом пункте, потому что я не понимаю, как я я должен указать файл airflow_local_settings.py
и функцию pod_mutation_hook
так, чтобы они действительно изменяли модуль перед запуском. Любая помощь будет принята с благодарностью. Спасибо.