Я запускаю задание Dataflow из Airflow.Я должен сказать, что я новичок в Airflow.Поток данных (запускается из Airflow) работает успешно, но я вижу, что у Airflow есть некоторая проблема с получением статуса задания, и я получаю бесконечное сообщение типа:
Задание DataFlow Google Cloud еще не доступно ..
Вот журналы сразу после добавления всех шагов в поток данных (я поместил {projectID} и {jobID} в тех местах, где это было):
[2018-10-01 13:00:13,987] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,987] {gcp_dataflow_hook.py:128} WARNING - b'INFO: Staging pipeline description to gs://my-project/staging'
[2018-10-01 13:00:13,987] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,987] {gcp_dataflow_hook.py:128} WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'
[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'INFO: To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-10-01_06_00_12-{jobID}?project={projectID}'
[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'Oct 01, 2018 1:00:13 PM org.apache.beam.runners.dataflow.DataflowRunner run'
[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b"INFO: To cancel the job using the 'gcloud' tool, run:"
[2018-10-01 13:00:13,988] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,988] {gcp_dataflow_hook.py:128} WARNING - b'> gcloud dataflow jobs --project={projectID} cancel --region=us-central1 2018-10-01_06_00_12-{jobID}'
[2018-10-01 13:00:13,990] {logging_mixin.py:95} INFO - [2018-10-01 13:00:13,990] {discovery.py:267} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/dataflow/v1b3/rest
[2018-10-01 13:00:14,417] {logging_mixin.py:95} INFO - [2018-10-01 13:00:14,417] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json
[2018-10-01 13:00:14,593] {logging_mixin.py:95} INFO - [2018-10-01 13:00:14,593] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..
[2018-10-01 13:00:29,614] {logging_mixin.py:95} INFO - [2018-10-01 13:00:29,614] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json
[2018-10-01 13:00:29,772] {logging_mixin.py:95} INFO - [2018-10-01 13:00:29,772] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..
[2018-10-01 13:00:44,790] {logging_mixin.py:95} INFO - [2018-10-01 13:00:44,790] {discovery.py:866} INFO - URL being requested: GET https://dataflow.googleapis.com/v1b3/projects/{projectID}/locations/us-central1/jobs?alt=json
[2018-10-01 13:00:44,937] {logging_mixin.py:95} INFO - [2018-10-01 13:00:44,937] {gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..
Знаете ли вы, что может вызватьэтот?Я не мог найти решение, связанное с этой проблемой.Должен ли я предоставить больше информации?
Вот моя задача в DAG:
# dataflow task
dataflow_t=DataFlowJavaOperator(
task_id='mydataflow',
jar='/lib/dataflow_test.jar',
gcp_conn_id='my_gcp_conn',
delegate_to='{service_account}@{projectID}.iam.gserviceaccount.com',
dag=dag)
и опции, связанные с потоком данных в DAG в default_args:
'dataflow_default_options': {
'project': '{projectID}',
'stagingLocation': 'gs://my-project/staging'
}