Можете ли вы загрузить данные в формате JSON в таблицу BigQuery, используя Python и load_table_from_ json ()? - PullRequest
0 голосов
/ 10 января 2020

Проблема:

Я пытаюсь загрузить объект JSON в таблицу BigQuery, используя Python 3.7.

Из чтения документации Google модуль google-cloud-bigquery выглядит так, как будто у него есть метод, который должен делать то, что я хочу: load_table_from_json(). Однако, когда я пытаюсь реализовать этот метод в моем скрипте Python, в оболочке Python возвращается следующая ошибка:

BadRequest: 400 Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.

Когда я проверяю историю заданий в BigQuery, у меня появляется некоторая дополнительная информация также:

Error while reading data, error message: JSON processing encountered too many errors, giving up. Rows: 1; errors: 1; max bad: 0; error percent: 0

И

Error while reading data, error message: JSON parsing error in row starting at position 0: Value encountered without start of object.

Вот синтаксис сценария Python, который я запускаю:

import pandas as pd
import numpy as np
from google.cloud import bigquery
import os

### Converts schema dictionary to BigQuery's expected format for job_config.schema
def format_schema(schema):
    formatted_schema = []
    for row in schema:
        formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
    return formatted_schema

### Create dummy data to load
df = pd.DataFrame([[2, 'Jane', 'Doe']],
columns=['id', 'first_name', 'last_name'])

### Convert dataframe to JSON object
json_data = df.to_json(orient = 'records')

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"<My_Credentials_Path>\application_default_credentials.json"

### Define schema as on BigQuery table, i.e. the fields id, first_name and last_name   
table_schema = {
          'name': 'id',
          'type': 'INTEGER',
          'mode': 'REQUIRED'
          }, {
          'name': 'first_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'last_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }

project_id = '<my_project>'
dataset_id = '<my_dataset>'
table_id = '<my_table>'

client  = bigquery.Client(project = project_id)
dataset  = client.dataset(dataset_id)
table = dataset.table(table_id)

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = format_schema(table_schema)
job = client.load_table_from_json(json_data, table, job_config = job_config)

print(job.result())

Насколько я могу судить из документов это должно работать - но это не так.

Я подозреваю, что проблема связана с JSON объектом json_data, и что BigQuery может не понравиться что-то об этом значении: [{"id":2,"first_name":"Jane","last_name":"Doe"}]. Даже если я передам параметр lines со значением True, это не имеет значения - объект JSON не имеет квадратных скобок: {"id":2,"first_name":"Jane","last_name":"Doe"}

Я также пытался использовать альтернативные значения для * Параметр 1032 *, как описано в документации to_json() , но все они выдают ту же ошибку, что и значение records в приведенном выше примере.

Я также попытался закомментировать строку job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, но получил ту же ошибку.

Обходной путь

Интересно, что в load_table_from_json() документации есть примечание, в котором говорится:

Если ваши данные уже являются строкой JSON, разделенной символом новой строки, лучше обернуть их в файлоподобный объект и передать их в load_table_from_file ():

import io
from google.cloud import bigquery

data = u'{"foo": "bar"}'
data_as_file = io.StringIO(data)

client = bigquery.Client()
client.load_table_from_file(data_as_file, ...)

Если я примените это к моему сценарию и попытайтесь загрузить данные, все это работает. Это указывает на то, что соединение с BigQuery работает правильно и что проблема действительно заключается в загрузке JSON в исходном виде. Нет упоминания об устаревании load_table_from_json() в пользу load_table_from_file(), так почему он не работает?

Рабочий код:

Для справки, вот версия сценария, которая использует load_table_from_file() метод для загрузки данных в BigQuery:

import pandas as pd
import numpy as np
from google.cloud import bigquery
import os
import io

def format_schema(schema):
    formatted_schema = []
    for row in schema:
        formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))   
    return formatted_schema

df = pd.DataFrame([[2, 'Jane', 'Doe']],
columns=['id', 'first_name', 'last_name'])

### Additional parameter used to convert to newline delimited format
json_data = df.to_json(orient = 'records', lines = True)
stringio_data = io.StringIO(json_data)

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"<My_Credentials_Path>\application_default_credentials.json"

table_schema = {
          'name': 'id',
          'type': 'INTEGER',
          'mode': 'REQUIRED'
          }, {
          'name': 'first_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'last_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }

project_id = '<my_project>'
dataset_id = '<my_dataset>'
table_id = '<my_table>'

client = bigquery.Client(project = project_id)
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = format_schema(table_schema)

job = client.load_table_from_file(stringio_data, table, job_config = job_config)
print(job.result())

1 Ответ

2 голосов
/ 10 января 2020

Функция client.load_table_from_file ожидает объект JSON вместо STRING. Чтобы исправить это, вы можете сделать:

import json

После создания строки JSON из Pandas, вы должны do:

json_object = json.loads(json_data)

И в конце вы должны использовать свой JSON объект:

job = client.load_table_from_json(json_object, table, job_config = job_config)

Таким образом, ваш код будет выглядеть так:

import pandas as pd
import numpy as np
from google.cloud import bigquery
import os, json

### Converts schema dictionary to BigQuery's expected format for job_config.schema
def format_schema(schema):
    formatted_schema = []
    for row in schema:
        formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
    return formatted_schema

### Create dummy data to load
df = pd.DataFrame([[2, 'Jane', 'Doe']],
columns=['id', 'first_name', 'last_name'])

### Convert dataframe to JSON object
json_data = df.to_json(orient = 'records')
json_object = json.loads(json_data)

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"<My_Credentials_Path>\application_default_credentials.json"

### Define schema as on BigQuery table, i.e. the fields id, first_name and last_name   
table_schema = {
          'name': 'id',
          'type': 'INTEGER',
          'mode': 'REQUIRED'
          }, {
          'name': 'first_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'last_name',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }

project_id = '<my_project>'
dataset_id = '<my_dataset>'
table_id = '<my_table>'

client  = bigquery.Client(project = project_id)
dataset  = client.dataset(dataset_id)
table = dataset.table(table_id)

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.schema = format_schema(table_schema)
job = client.load_table_from_json(json_object, table, job_config = job_config)

print(job.result())

Пожалуйста, дайте мне знать, если это вам поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...