Я пытаюсь выполнить файл Python потока данных, который читает текстовый файл из корзины GCS через DAG воздушного потока, используя его DataFlowPythonOperator. Я был в состоянии выполнить файл Python независимо, но это не удается, когда я выполняю его через поток воздуха. Я использую служебную учетную запись для аутентификации для моего соединения gcp по умолчанию.
Ошибка при выполнении задания:
{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
{models.py:1417} ERROR - DataFlow failed with return code 2
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/dataflow_operator.py", line 182, in execute
self.py_file, self.py_options)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 152, in start_python_dataflow
task_id, variables, dataflow, name, ["python"] + py_options)
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
_Dataflow(cmd).wait_for_done()
File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
self._proc.returncode))
Exception: DataFlow failed with return code 2
Мой скрипт воздушного потока:
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime, timedelta
# Default DAG parameters
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': <email>,
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime(2018, 4, 30),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': '<Project ID>'
}
}
dag = DAG(
dag_id='df_dag_readfromgcs',
default_args=default_args,
schedule_interval=timedelta(minutes=60)
)
task1 = DataFlowPythonOperator(
task_id='task1',
py_file='~<path>/1readfromgcs.py',
gcp_conn_id='default_google_cloud_connection',
dag=dag
)
Файл Python моего Dataflow (1readfromgcs.py) содержит следующий код:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
import apache_beam.pipeline as pipeline
import apache_beam.io as beamio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
def runCode(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='<Input file path>',
help='File name')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--project=<project name>',
'--runner=DataflowRunner',
'--job_name=<job name>',
'--region=europe-west1',
'--staging_location=<GCS staging location>',
'--temp_location=<GCS temp location>'
])
pipeline_options = PipelineOptions(pipeline_args)
p = beam.pipeline.Pipeline(options=pipeline_options)
rows = p | 'read' >> beam.io.ReadFromText(known_args.input)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
runCode()
Я не могу отладить и выяснить причину этого исключения, и согласно моему исследованию в файле Airflow: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/gcp_dataflow_hook.py, ошибка возникает из следующих строк:
def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
self.log.info("Start waiting for DataFlow process to complete.")
while self._proc.poll() is None:
ret = select.select(reads, [], [], 5)
if ret is not None:
for fd in ret[0]:
line = self._line(fd)
self.log.debug(line[:-1])
else:
self.log.info("Waiting for DataFlow process to complete.")
if self._proc.returncode is not 0:
raise Exception("DataFlow failed with return code {}".format(
self._proc.returncode))
Ценю ваши мысли и помогите с моей проблемой.