Параметры шаблона потока данных недопустимы: шаблон потока данных вызывается из Cloud Composer - PullRequest
0 голосов
/ 20 декабря 2018

Я использую Dataprep для создания шаблона потока данных, который в основном выполняет вставку таблицы GCS в BQ.Я получил шаблон, экспортированный в папку / tmp и использованный в качестве параметров в dataflow_operator.DataflowTemplateOperator ().Я не могу заставить его работать как шаблон облака данных Google с помощью Cloud composer

# [START composer_quickstart]

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataflow_operator
from airflow.utils import trigger_rule
from airflow.operators import email_operator


yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
'email': models.Variable.get('email'),
# emailing here.
'email_on_success':True,
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project'),
'dataflow_default_options': {
'project': 'sandbox',
'zone': 'us-central1-a',
'stagingLocation': models.Variable.get('stg_bucket'),
}
}

default_args = {
'dataflow_default_options': {
'project': 'sandbox',
'zone': 'us-central1-a',
'tempLocation': models.Variable.get('tmp_bucket'),
}
}

# [START composer_quickstart_schedule]
with models.DAG(
'Composer_DataFLow_InvocationV1',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]

task = dataflow_operator.DataflowTemplateOperator(
task_id='composer_DataFlow_Template',
template='gs://masterdata/tmp/cloud-dataprep-dpr-ppd-d-trace-1285741-by-rajeshsampathrajan_template',
parameters={
'inputFile': "gs://masterdata/landing/310_PPD_201812190907.TXT",
'outputFile': "sandbox:SALES.ppd_d_trace"
},
#conn_id='service-xxxxxx@trifacta-gcloud-prod.iam.gserviceaccount.com',
dag=dag)


# [START composer_quickstart_steps]
# Define DAG dependencies.
#task >> task
# [END composer_quickstart_steps]

# [END composer_quickstart]

Журналы ниже: *** Чтение удаленного журнала из gs: // us-central1-cmpr-ppd-sales - 977c1ebb-bucket / журналы / Composer_DataFLow_InvocationV1 / composer_DataFlow_Template / 2018-12-19T00: 00: 00 / 22.log.[2018-12-20 05: 33: 40,002] {cli.py:374} ИНФОРМАЦИЯ - Работа на хосте airflow-worker-5f5fc5655d-5gqsz [2018-12-20 05: 33: 40,107] {models.py:1196}ИНФОРМАЦИЯ - все зависимости встречались для [2018-12-20 05: 33: 40,121] {models.py:1196} ИНФО - все зависимости встречались для [2018-12-20 05: 33: 40,121] {models.py:1406}ИНФОРМАЦИЯ - ------------------------------------------------------------------------------- Стартовая попытка 22 из 2 -------------------------------------------------------------------------------

[2018-12-20 05:33:40,179] {models.py:1427} INFO - Executing <Task(DataflowTemplateOperator): composer_DataFlow_Template> on 2018-12-19 00:00:00
[2018-12-20 05:33:40,179] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run Composer_DataFLow_InvocationV1 composer_DataFlow_Template 2018-12-19T00:00:00 --job_id 126 --raw -sd DAGS_FOLDER/dags_ppd_airflow_template.py']
[2018-12-20 05:33:42,062] {base_task_runner.py:98} INFO - Subtask: [2018-12-20 05:33:42,061] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-12-20 05:33:42,142] {base_task_runner.py:98} INFO - Subtask: [2018-12-20 05:33:42,142] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dags_ppd_airflow_template.py
[2018-12-20 05:33:42,983] {base_task_runner.py:98} INFO - Subtask: [2018-12-20 05:33:42,982] {gcp_api_base_hook.py:74} INFO - Getting connection using `gcloud auth` user, since no key file is defined for hook.
[2018-12-20 05:33:43,035] {base_task_runner.py:98} INFO - Subtask: [2018-12-20 05:33:43,035] {discovery.py:852} INFO - URL being requested: POST https://dataflow.googleapis.com/v1b3/projects/ppd-sandbox/templates:launch?alt=json&gcsPath=gs%3A%2F%2Fppd_arm_masterdata%2Ftmp%2Fcloud-dataprep-dpr-ppd-d-trace-1285741-by-rajeshsampathrajan_template
[2018-12-20 05:33:43,036] {base_task_runner.py:98} INFO - Subtask: [2018-12-20 05:33:43,036] {client.py:595} INFO - Attempting refresh to obtain initial access_token
[2018-12-20 05:33:43,257] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-12-20 05:33:43,259] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 6, in <module>
[2018-12-20 05:33:43,259] {base_task_runner.py:98} INFO - Subtask:     exec(compile(open(__file__).read(), __file__, 'exec'))
[2018-12-20 05:33:43,260] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
[2018-12-20 05:33:43,260] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-12-20 05:33:43,260] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
[2018-12-20 05:33:43,261] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-12-20 05:33:43,261] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
[2018-12-20 05:33:43,262] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-12-20 05:33:43,263] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
[2018-12-20 05:33:43,264] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-12-20 05:33:43,265] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/operators/dataflow_operator.py", line 234, in execute
[2018-12-20 05:33:43,265] {base_task_runner.py:98} INFO - Subtask:     self.parameters, self.template)
[2018-12-20 05:33:43,266] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 219, in start_template_dataflow
[2018-12-20 05:33:43,266] {base_task_runner.py:98} INFO - Subtask:     name, variables, parameters, dataflow_template)
[2018-12-20 05:33:43,266] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_dataflow_hook.py", line 260, in _start_template_dataflow
[2018-12-20 05:33:43,267] {base_task_runner.py:98} INFO - Subtask:     response = request.execute()
[2018-12-20 05:33:43,269] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/oauth2client/util.py", line 135, in positional_wrapper
[2018-12-20 05:33:43,270] {base_task_runner.py:98} INFO - Subtask:     return wrapped(*args, **kwargs)
[2018-12-20 05:33:43,270] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/googleapiclient/http.py", line 838, in execute
[2018-12-20 05:33:43,270] {base_task_runner.py:98} INFO - Subtask:     raise HttpError(resp, content, uri=self.uri)
[2018-12-20 05:33:43,271] {base_task_runner.py:98} INFO - Subtask: googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/ppd-sandbox/templates:launch?alt=json&gcsPath=gs%3A%2F%2Fppd_arm_masterdata%2Ftmp%2Fcloud-dataprep-dpr-ppd-d-trace-1285741-by-rajeshsampathrajan_template returned "The template parameters are invalid.">
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...