Я боролся с ошибкой шаблонизации Jinja в composer-1.5.2-airflow-1.10.0.
Мне нужен был оператор K8SO в DAG для запуска ежедневного задания потока данных из опубликованного шаблона через CI/ CD после ежедневной передачи файлов с FTP и после начала работы, мне нужно было проверить статус работы через REST API.
В моей DAG я полагаюсь на образ Docker Python для запуска GCP REST API через Python Google Client.
Вот мой пример дескриптора yaml ниже:
yaml_dataflow_create_job_from_template= """
containers:
- name: load-data
image: registry.xxx.yyyy.com/docker-images/python-gcloud:latest
resources:
requests:
memory: "256Mi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "2"
args:
- python
- "-c"
- |
import os
import io
import json
import google.cloud.storage.client as storage
from googleapiclient.discovery import build
service = build('dataflow', 'v1b3')
GCSPATH="gs://{{params.bucket}}/templates/{{params.template_name}}"
BODY = (
'{'
' "jobName": "{{params.date}}-dataflow-job",'
' "parameters": {'
' "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.file.fr.log.gz",'
' "outputLogsByDayTable": "{{params.dataByDayTable}}",'
' "outputLogsByWeekTable": "{{params.dataByWeekTable}}",'
' "outputLogsByMonthTable": "{{params.dataByMonthTable}}",'
' "outputRawLogsTable": "{{params.dataRawTable}}"'
' },'
' "environment": {'
' "serviceAccountEmail": "{{params.sac}}",'
' "tempLocation": "{{params.templocation}}",'
' "zone": "{{params.zone}}",'
' "network": "shared",'
' "subnetwork": "{{params.network}}"'
' }'
'}'
)
request = service.projects().locations().templates().launch(projectId="{{params.project_id}}", location="{{params.region}}", gcsPath=GCSPATH, body=json.loads(BODY))
response = request.execute()
print("send : ")
print("response : "+ json.dumps(response))
job_id = ""
try:
status = response['job']['currentState']
except KeyError:
job_id = response['job']['id']
import json
import time
print("Start :" + time.ctime())
count = 0
status = 'unknown'
statusRequest = service.projects().locations().jobs().get(projectId="{{params.project_id}}", location="{{params.region}}", jobId=job_id)
while (status == "unknown" and count <=200):
time.sleep( 5 )
statusResponse = statusRequest.execute()
status = statusResponse['job']['currentState']
print("Request#"+str(count)+" job("+job_id+") status :"+ status)
count = count + 1
try:
status = response['job']['currentState']
except KeyError:
status = 'unknown'
print("Request#"+str(count)+" job("+job_id+") status :"+ status)
print("End : " + time.ctime())
volumeMounts:
- name: google-cloud-key
mountPath: /var/secrets/google
readOnly: true
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/credentials-keyfile.json
imagePullSecrets:
- name: gitlab-key
volumes:
- name: google-cloud-key
secret:
secretName: credentials-keyfile
"""
Вот мой K8sOperator
dataflow_job = K8SJobOperator(task_id="dataflow_daily_job",
location=location,
project_id=host_project,
cluster_name=host_cluster,
name="dataflow_daily_job",
gcp_conn_id='gcp_kub_runner',
params={"yesterday_date": yesterdayds, "date": executiondate, "bucket": SEOLOG_SOURCING_BUCKET, "template_name" : "DataflowTmpl" , "sac" : 'service-account@project.iam.gserviceaccount.com', "job_name" : '{executiondate}-Dataflow-job'.format(executiondate=executiondate), "inputLogFile" : 'gs://project-bucket/input/{yesterdayds}.file.fr.log.gz'.format(yesterdayds=yesterdayds), "outputLogsByDayTable" : "project:dataset.DATA_BYDAY", "outputLogsByWeekTable" : "project:dataset.DATA_BYWEEK", "outputLogsByMonthTable" : "project:dataset.DATA_BYMONTH", "outputRawLogsTable" : "project:dataset.RAW_DATA" ,"templocation" : "gs://project-bucket/input/temp/", "region": Variable.get('REGION'), "zone": Variable.get('LOCATION'), "project_id" : Variable.get('DP_PROJECT'), "network" : "https://www.googleapis.com/compute/v1/{subnet}".format(dpsubnet=Variable.get('SUBNET'))},
namespace='composer', descriptor=yaml_dataflow_create_job_from_template,
timeout_s=60 * 15, dag=dag)
Я заметил [2019-09-18 14:23:23,282] {logging_mixin.py:95} INFO - Error running jinja on yam
без дальнейшего объяснения.
Эта проблема не позволяла правильно интерпретировать любые инструкции Python, использующие символы [
, ]
или :
.Любой цикл while
или for
, а также любая инструкция if
, else
или elif
не были распознаны, а также получение какого-либо значения из ответа JSON или какого-либо массива было невозможно.Airflow {{ ds}}
, {{ ds_nodash }}
, {{ yesterday }}
и {{ yesterday_ds_nodash }}
также были повреждены из-за ошибки Jinja.
Через несколько дней я наконец заметил, что эта проблема была решена путем замены переменной JSON Python из этой:
BODY = (
'{'
' "jobName": "{{params.date}}-dataflow-job",'
' "parameters": {'
' "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.file.fr.log.gz",'
' "outputLogsByDayTable": "{{params.outputLogsByDayTable}}",'
' "outputLogsByWeekTable": "{{params.outputLogsByWeekTable}}",'
' "outputLogsByMonthTable": "{{params.outputLogsByMonthTable}}",'
' "outputRawLogsTable": "{{params.outputRawLogsTable}}"'
' },'
' "environment": {'
' "serviceAccountEmail": "{{params.sac}}",'
' "tempLocation": "{{params.templocation}}",'
' "zone": "{{params.zone}}",'
' "network": "shared",'
' "subnetwork": "{{params.network}}"'
' }'
'}'
)
на это:
BODY= '{' \
' "jobName": "{{ params.date }}-dataflow-job",' \
' "parameters": {' \
' "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.input.fr.log.gz",' \
' "outputLogsByDayTable": "{{ params.outputLogsByDayTable }}",' \
' "outputLogsByWeekTable": "{{ params.outputLogsByWeekTable }}",' \
' "outputLogsByMonthTable": "{{ params.outputLogsByMonthTable }}",' \
' "outputRawLogsTable": "{{ params.outputRawLogsTable }}"' \
' },' \
' "environment": {' \
' "serviceAccountEmail": "{{ params.sac }}",' \
' "tempLocation": "{{ params.templocation }}",' \
' "zone": "{{ params.zone }}",' \
' "network": "shared",' \
' "subnetwork": "{{ params.network }}"' \
' }' \
'}'
Хотя это было синтаксически эквивалентно.ps: не обращайте внимания на тот факт, что я использовал params, пока в этом не было необходимости ... Я просто объявил их один раз в начале моего DAG:
yesterdayds = '{{ yesterday_ds_nodash }}'
executiondate = '{{ ds_nodash }}'
date = '{{ ds }}'