Ошибка шаблона воздушного потока Jinja при разборе K8SOperator с Python - PullRequest
0 голосов
/ 25 сентября 2019

Я боролся с ошибкой шаблонизации Jinja в composer-1.5.2-airflow-1.10.0.

Мне нужен был оператор K8SO в DAG для запуска ежедневного задания потока данных из опубликованного шаблона через CI/ CD после ежедневной передачи файлов с FTP и после начала работы, мне нужно было проверить статус работы через REST API.

В моей DAG я полагаюсь на образ Docker Python для запуска GCP REST API через Python Google Client.

Вот мой пример дескриптора yaml ниже:

yaml_dataflow_create_job_from_template= """
containers:
  - name: load-data
    image: registry.xxx.yyyy.com/docker-images/python-gcloud:latest
    resources: 
      requests: 
        memory: "256Mi"
        cpu: "0.5"
      limits:
        memory: "2Gi"
        cpu: "2"
    args:
      - python
      - "-c"
      - |
        import os
        import io
        import json
        import google.cloud.storage.client as storage
        from googleapiclient.discovery import build

        service = build('dataflow', 'v1b3')

        GCSPATH="gs://{{params.bucket}}/templates/{{params.template_name}}"

        BODY = (
        '{'
        '    "jobName": "{{params.date}}-dataflow-job",'
        '    "parameters": {'
        '        "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.file.fr.log.gz",'
        '        "outputLogsByDayTable": "{{params.dataByDayTable}}",'
        '        "outputLogsByWeekTable": "{{params.dataByWeekTable}}",'
        '        "outputLogsByMonthTable": "{{params.dataByMonthTable}}",'
        '        "outputRawLogsTable": "{{params.dataRawTable}}"'
        '    },'
        '    "environment": {'
        '        "serviceAccountEmail": "{{params.sac}}",'
        '        "tempLocation": "{{params.templocation}}",'
        '        "zone": "{{params.zone}}",'
        '        "network": "shared",'
        '        "subnetwork": "{{params.network}}"'
        '    }'
        '}'
        )

        request = service.projects().locations().templates().launch(projectId="{{params.project_id}}", location="{{params.region}}", gcsPath=GCSPATH, body=json.loads(BODY))
        response = request.execute()

        print("send : ")
        print("response : "+ json.dumps(response))

        job_id = ""
        try:
           status = response['job']['currentState']
        except KeyError: 
           job_id = response['job']['id']

        import json
        import time

        print("Start :" + time.ctime())
        count = 0
        status = 'unknown'

        statusRequest = service.projects().locations().jobs().get(projectId="{{params.project_id}}", location="{{params.region}}", jobId=job_id)
        while (status == "unknown" and count <=200):
           time.sleep( 5 )
           statusResponse = statusRequest.execute()
           status = statusResponse['job']['currentState']
           print("Request#"+str(count)+" job("+job_id+") status :"+ status)
           count = count + 1
           try:
              status = response['job']['currentState']
           except KeyError: 
              status = 'unknown'
        print("Request#"+str(count)+" job("+job_id+") status :"+ status)
        print("End : " + time.ctime())

    volumeMounts:
      - name: google-cloud-key
        mountPath: /var/secrets/google
        readOnly: true
    env:
      - name: GOOGLE_APPLICATION_CREDENTIALS
        value: /var/secrets/google/credentials-keyfile.json

imagePullSecrets:
  - name: gitlab-key
volumes:
  - name: google-cloud-key
    secret:
      secretName: credentials-keyfile
"""

Вот мой K8sOperator

dataflow_job = K8SJobOperator(task_id="dataflow_daily_job",
                                          location=location,
                                          project_id=host_project,
                                          cluster_name=host_cluster,
                                          name="dataflow_daily_job",
                                          gcp_conn_id='gcp_kub_runner',
                                          params={"yesterday_date": yesterdayds, "date": executiondate, "bucket": SEOLOG_SOURCING_BUCKET, "template_name" : "DataflowTmpl" , "sac" : 'service-account@project.iam.gserviceaccount.com', "job_name" : '{executiondate}-Dataflow-job'.format(executiondate=executiondate), "inputLogFile" : 'gs://project-bucket/input/{yesterdayds}.file.fr.log.gz'.format(yesterdayds=yesterdayds), "outputLogsByDayTable" : "project:dataset.DATA_BYDAY", "outputLogsByWeekTable" : "project:dataset.DATA_BYWEEK", "outputLogsByMonthTable" : "project:dataset.DATA_BYMONTH", "outputRawLogsTable" : "project:dataset.RAW_DATA" ,"templocation" : "gs://project-bucket/input/temp/", "region": Variable.get('REGION'), "zone": Variable.get('LOCATION'), "project_id" : Variable.get('DP_PROJECT'), "network" : "https://www.googleapis.com/compute/v1/{subnet}".format(dpsubnet=Variable.get('SUBNET'))},
                                          namespace='composer', descriptor=yaml_dataflow_create_job_from_template,
                                          timeout_s=60 * 15, dag=dag)

Я заметил [2019-09-18 14:23:23,282] {logging_mixin.py:95} INFO - Error running jinja on yam без дальнейшего объяснения.

Эта проблема не позволяла правильно интерпретировать любые инструкции Python, использующие символы [, ] или :.Любой цикл while или for, а также любая инструкция if, else или elif не были распознаны, а также получение какого-либо значения из ответа JSON или какого-либо массива было невозможно.Airflow {{ ds}}, {{ ds_nodash }}, {{ yesterday }} и {{ yesterday_ds_nodash }} также были повреждены из-за ошибки Jinja.

Через несколько дней я наконец заметил, что эта проблема была решена путем замены переменной JSON Python из этой:

        BODY = (
        '{'
        '    "jobName": "{{params.date}}-dataflow-job",'
        '    "parameters": {'
        '        "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.file.fr.log.gz",'
        '        "outputLogsByDayTable": "{{params.outputLogsByDayTable}}",'
        '        "outputLogsByWeekTable": "{{params.outputLogsByWeekTable}}",'
        '        "outputLogsByMonthTable": "{{params.outputLogsByMonthTable}}",'
        '        "outputRawLogsTable": "{{params.outputRawLogsTable}}"'
        '    },'
        '    "environment": {'
        '        "serviceAccountEmail": "{{params.sac}}",'
        '        "tempLocation": "{{params.templocation}}",'
        '        "zone": "{{params.zone}}",'
        '        "network": "shared",'
        '        "subnetwork": "{{params.network}}"'
        '    }'
        '}'
        )

на это:

        BODY= '{'  \
        '    "jobName": "{{ params.date }}-dataflow-job",' \
        '    "parameters": {' \
        '        "inputLogFile" : "gs://project-bucket/input/{{ params.yesterday_date }}.input.fr.log.gz",' \
        '        "outputLogsByDayTable": "{{ params.outputLogsByDayTable }}",' \
        '        "outputLogsByWeekTable": "{{ params.outputLogsByWeekTable }}",' \
        '        "outputLogsByMonthTable": "{{ params.outputLogsByMonthTable }}",' \
        '        "outputRawLogsTable": "{{ params.outputRawLogsTable }}"' \
        '    },' \
        '    "environment": {' \
        '        "serviceAccountEmail": "{{ params.sac }}",' \
        '        "tempLocation": "{{ params.templocation }}",' \
        '        "zone": "{{ params.zone }}",' \
        '        "network": "shared",' \
        '        "subnetwork": "{{ params.network }}"' \
        '    }' \
        '}'

Хотя это было синтаксически эквивалентно.ps: не обращайте внимания на тот факт, что я использовал params, пока в этом не было необходимости ... Я просто объявил их один раз в начале моего DAG:

yesterdayds = '{{ yesterday_ds_nodash }}'
executiondate = '{{ ds_nodash }}'
date = '{{ ds }}'
...