Как игнорировать неизвестный столбец при загрузке в bigQuery с помощью Airflow? - PullRequest
0 голосов
/ 25 сентября 2018

Я загружаю данные из Google Storage в bigQuery, используя GoogleCloudStorageToBigQueryOperator

Возможно, файл Json будет иметь больше столбцов, чем я определил.В этом случае я хочу продолжить работу загрузки - просто игнорируйте этот нераспознанный столбец.Я пытался использовать аргумент ignore_unknown_values, но это не имело никакого значения.

Мой оператор:

def dc():
    return [
    {
        "name": "id",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
    {
        "name": "storeId",
        "type": "INTEGER",
        "mode": "NULLABLE"
    },
 ...
]
gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    ignore_unknown_values = True,
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

Ошибка:

u 'Ошибка при чтении данных, сообщение об ошибке: Ошибка синтаксического анализа JSON в строке, начинающейся в позиции 0: Нет такого поля: shippingService. ',

, что является истинным.shippingService не существует, и он не будет добавлен в таблицу.

Как это исправить?

Редактировать: Удален schema_fields=dc() из оператора:

gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    ignore_unknown_values = True,
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

Все равно выдает ту же ошибку.Это не делает сцену .. У него есть команда, чтобы игнорировать неизвестные значения :(

1 Ответ

0 голосов
/ 25 сентября 2018

Единственная причина, по которой я могу думать, это то, что вы, вероятно, используете Airflow 1.9.Эта функция была добавлена ​​в Воздушный поток 1.10 .

Однако вы можете использовать ее следующим образом в Воздушный поток 1.9 , добавив src_fmt_configs={'ignoreUnknownValues': True}:

gcs_to_bigquery_st = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_to_BigQuery_stage',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_st,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    src_fmt_configs={'ignoreUnknownValues': True},
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)
...