Ошибка создания облачного компоновщика воздушного потока: Сломанный DAG: невозможно импортировать имя '_parse_data' при импорте нового dag - PullRequest
0 голосов
/ 30 апреля 2019

Я пытаюсь создать DAG в Cloud Composer.При импорте я получаю следующую ошибку:

Broken DAG: [/home/airflow/gcs/dags/airflow_bigquery_v12.py] не может импортировать имя _parse_data

Это файл DAG.Как вы увидите, он пытается скопировать файл облачного хранилища в bigquery:

import datetime
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())
YESTERDAY = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

DEFAULT_ARGS = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': YESTERDAY,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=1),
    'project_id': models.Variable.get('gcp_project')
}


with DAG('airflow_bigquery_v12',
         default_args=DEFAULT_ARGS,
         schedule_interval=timedelta(days=1),
         catchup=False
         ) as dag:


    start_task = DummyOperator(task_id="start", dag=dag)
    end_task = DummyOperator(task_id="end", dag=dag)



    gcs_to_bigquery_rides = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id='load_to_BigQuery_stage',
        bucket='my_bucket',
        destination_project_dataset_table='misc.pg_rides_json_airflow',
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=['rides_new.json'],
        #ignore_unknown_values = True,
        #schema_fields=dc(),
        schema_object= 'rides_schema.json',
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        #skip_leading_rows = 1,
        google_cloud_storage_conn_id='google_cloud_storage_default',
        bigquery_conn_id='bigquery_default'
        )

start_task >> gcs_to_bigquery_rides >> end_task

Для справки, это файл rides_new.json, который находится внутри my_bucket и содержит схему длятаблица будет создана

[
  {
    "mode": "NULLABLE",
    "name": "finish_picture_state",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_picture_file_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "finish_reason",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "starting_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "finished_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "created_at",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "ending_battery_level",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "state",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "currency",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "amount",
        "type": "INTEGER"
      }
    ],
    "mode": "NULLABLE",
    "name": "cost",
    "type": "RECORD"
  },
  {
    "mode": "NULLABLE",
    "name": "stoped_since",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "user_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "minutes",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "vehicle_id",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "distance",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "service_area_id",
    "type": "STRING"
  },
  {
    "fields": [
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "base",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "INTEGER"
          }
        ],
        "mode": "NULLABLE",
        "name": "per_minute",
      }
    ],
    "mode": "NULLABLE",
    "name": "pricing",
    "type": "RECORD"
  },
  {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "m",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "latitude",
        "type": "FLOAT"
      },
      {
        "mode": "NULLABLE",
        "name": "longitude",
        "type": "FLOAT"
      }
    ],
    "mode": "REPEATED",
    "name": "path",
    "type": "RECORD"
  }
]

ваша помощь очень ценится.спасибо

1 Ответ

1 голос
/ 07 мая 2019

_parse_data устарел на pandas-gbq 0.10.0.

https://github.com/pydata/pandas-gbq/commit/ebcbfbe1fecc90ac9454751206115adcafe4ce24#diff-4db670026d33c02e5ad3dfbd5e4fd595L664

И поток воздуха прекратился с использованием _parse_data после 1.10.0.

https://github.com/apache/airflow/commit/8ba86072f9c5ef81933cd6546e7e2f000f862053#diff-ee06f8fcbc476ea65446a30160c2a2b2L27

Необходимо:

  • Понизить apache-airflow до версии менее 1.10.0 или

  • Понижение pandas-gbq до версии менее 0.10.0.

...