Воздушный поток не может получить статус успеха из потока данных - PullRequest
0 голосов
/ 26 октября 2018

При отправке задания потока данных из воздушного потока, он не может получить статус успеха задания потока данных и продолжает отображать приведенную ниже ошибку.

{gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..

воздушный поток Дага

t2 = DataFlowPythonOperator(
task_id='google_dataflow',
py_file='/Users/abc/sample.py',
gcp_conn_id='connection_id',
dataflow_default_options={
    "project": 'Project_id'
    "runner": "DataflowRunner",
    "staging_location": 'gs://Project_id/staging',
    "temp_location": 'gs://Project_id/staging'
}
)

Sample.py

def run():
argv = [
        '--project={0}'.format(PROJECT),
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

with beam.Pipeline(argv=argv) as p:
  (p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(
        query = 'Select * from `ds.table` limit 10', 
        use_standard_sql=True))

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

прочитал другие ответы на форуме, и, как и предполагалось, я удалил имена заданий из sample.py, а также dag Airflow, но по-прежнему не удается получить код возврата успеха.

Из журнала воздушных потоков, когда задание передается в поток данных

{gcp_dataflow_hook.py:116} INFO - Running command:  python /Users/abc/sample.py 
--runner=DataflowRunner -- project=project_id --region=region_name - 
labels=airflow-version=v1-10-0 --job_name=google_dataflow-f8a478ae

после завершения задания потока данных

{gcp_dataflow_hook.py:128} WARNING - INFO:root:Job 2018-10-26_06_07_04- 
 17336980599969256162 is in state JOB_STATE_DONE
{gcp_api_base_hook.py:90} INFO - Getting connection using a JSON key file.
{discovery.py:866} INFO - URL being requested: GET 
https://dataflow.googleapis.com/v1b3/projects/project_id/locations/us- 
central1/jobs?alt=json
{gcp_dataflow_hook.py:77} INFO - Google Cloud DataFlow job not available yet..

Не уверен, как с этим разобраться, может кто-нибудь помочь

Сводка заданий потока данных из консоли

Job name beamapp-user-1026130638-681570
Job ID 2018-10-26_06_07_04-17336980599969256162
Region us-central1
Job statusSucceeded
SDK version Apache Beam SDK for Python 2.7.0
...