Я использую Airflow для запуска задания загрузки в Google BigQuery. Исходные файлы состоят из нескольких файлов ND JSON.
Вот оператор Airflow (не актуально, я думаю. Приведенный для контекста):
load = GoogleCloudStorageToBigQueryOperator(
task_id=f"load",
bigquery_conn_id="bigquery_default",
pool="bigquery_insert",
destination_project_dataset_table="<HIDDEN>",
bucket="<HIDDEN>",
source_objects=list_files(),
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
autodetect=True,
ignore_unknown_values=True
)
Чтобы убедиться, что это не ошибка Airflow Я отладил и извлек именно ту полезную нагрузку, которую Airflow отправляет в Google BigQuery REST API:
{
"configuration":{
"load":{
"autodetect":True,
"createDisposition":"CREATE_IF_NEEDED",
"destinationTable":{
"projectId":"<PRIVATE>",
"datasetId":"<PRIVATE>",
"tableId":"<PRIVATE>"
},
"sourceFormat":"NEWLINE_DELIMITED_JSON",
"sourceUris":[
"<PRIVATE>"
],
"writeDisposition":"WRITE_APPEND",
"ignoreUnknownValues":True
}
}
}
Так как я устанавливаю опцию ignoreUnknownValues
( документация ), я бы ожидается, что JSON поля, которые находятся в моих исходных файлах, но не в моей целевой схеме, будут игнорироваться, но я получаю следующую ошибку от BigQuery:
Исключение: сбой задания BigQuery. Последняя ошибка: {'причина': 'неверно', 'сообщение': 'предоставленная схема не соответствует таблице [ЧАСТНЫЕ]. Невозможно добавить поля (field: source_fingerprint) '}. Задание было: {'kind': 'bigquery # job', 'etag': '[PRIVATE]', 'id': '[PRIVATE]', 'selfLink': '[PRIVATE]', 'user_email': ' [PRIVATE] ',' configuration ': {' load ': {' sourceUris ': [[PRIVATE]],' destinationTable ': {' projectId ':' [PRIVATE] ',' datasetId ':' airflow ',' tableId ':' [PRIVATE] '},' createDisposition ':' CREATE_IF_NEEDED ',' writeDisposition ':' WRITE_APPEND ',' sourceFormat ':' NEWLINE_DELIMITED_ JSON ',' ignoreUnknownValues ': True,' autodetect ': True},' jobType ':' LOAD '},' jobReference ': {' projectId ':' [PRIVATE] ',' jobId ':' [PRIVATE] ',' location ':' EU '},' statistics ': {' creationTime ' : '1581675754961', 'startTime': '1581675755090', 'endTime': '1581675755491'}, 'status': {'errorResult': {'reason': 'invalid', 'message': 'Предоставленная схема не соответствует Таблица [ЧАСТНЫЕ]. Невозможно добавить поля (field: source_fingerprint) '},' errors ': [{' reason ':' invalid ',' message ':' Предоставленная схема не соответствует таблице [PRIVATE]. Невозможно добавить поля (field: source_fingerprint) '}],' state ':' DONE '}}
Обратите внимание, что моя опция ignoreUnknownValues
также возвращается к ответу, поэтому она была понятна на их стороне.
Я ожидаю, что дополнительные столбцы будут проигнорированы, и задание будет успешно выполнено, согласно документации:
ignoreUnknownValues: boolean
[Необязательно ] Указывает, должен ли BigQuery разрешать дополнительные значения, которые не представлены в схеме таблицы. Если true, дополнительные значения игнорируются. Если значение равно false, записи с дополнительными столбцами считаются ошибочными, а если слишком много плохих записей, в результате задания возвращается неверная ошибка. Значение по умолчанию неверно. Свойство sourceFormat определяет, что BigQuery рассматривает как дополнительное значение: CSV: конечные столбцы JSON: именованные значения, которые не соответствуют ни одному из имен столбцов
Кто-нибудь знает, что происходит?
Обратите внимание, что я не хочу обновлять свою схему (и, следовательно, я не использую опцию schemaUpdateOptions
). Я хотел бы, чтобы дополнительные столбцы игнорировались.
Спасибо
-
Обновление 1: я использую Airflow 1.10.3, который уже поддерживает этот синтаксис для этого вариант. В старых версиях Airflow этот параметр передавался по-разному, но, как мы видим из информации, которую я разместил, Airflow, похоже, отправляет правильный параметр в Google BigQuery API ( связанный вопрос не применяется) .
Обновление 2. Также при использовании интерфейса командной строки появляется та же ошибка.
bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON --noreplace --ignore_unknown_values [MY TABLE NAME] [MY GCS PATH]
Ожидание bqjob _ [...] _ 1 ... (0 с) Текущее состояние: ВЫПОЛНЕНО Ошибка BigQuery в операции загрузки: Задание обработки ошибки '[...]': Предоставленная схема не соответствует таблице [...]. Невозможно добавить поля (field: metadata_deposit_00_sourceId)
Обновление 3: похоже, проблема возникает, когда я одновременно использую и autodetect
, и ignore_unkown_values
. Если я предоставлю существующую схему как schema_fields
, тогда ignore_unkown_values
будет работать так, как я ожидаю, но это не так ясно для меня в документации.