Читайте огромный JSON построчно из Google Cloud Storage с Python - PullRequest
0 голосов
/ 09 октября 2018

Я знаю, что у меня должен быть код, но у меня пока нет ничего полезного.

В моем GCS gs://path/listings_all.json есть файл ~ 300 ГБ JSON , в итоге я пытаюсь импортировать его вBigQuery, но у него неправильная структура данных (я взял его из mongoexport из MongoDB)

неверное имя поля "$ date".Поля должны содержать только буквы, цифры и подчеркивания, начинаться с буквы или подчеркивания и иметь длину не более 128 символов

Итак, теперь мой подход заключается в том, чтобы каким-то образом читать исходный файл построчно из GCSобработайте его и загрузите каждую обработанную строку в BigQuery с помощью Python API.

Ниже простой читатель, который я собрал, чтобы протестировать образец 100 строк из оригинального огромного файла:

import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

Не могли бы вы, пожалуйстапомогите мне, как я могу читать или передавать огромные строки JSON построчно из Google Cloud Storage с Python3?

Ответы [ 3 ]

0 голосов
/ 09 октября 2018

Строковое чтение, а затем попытка потоковой передачи в BigQuery не увеличится до 300 ГБ на вашем локальном компьютере, и вам будет трудно получить этот рабочий TBH.

Есть несколько масштабируемых опций:

  1. Напишите конвейер Cloud Dataflow для чтения вашего файла из GCS (он будет масштабироваться для вас и читать параллельно), исправьте имя поля и затем запишите в BigQuery.См. здесь .
  2. Загрузите его непосредственно в BigQuery, используя CSV вместо формата JSON и используя разделитель, который не отображается в ваших данных.Это загрузит каждую запись в один столбец String, а затем вы сможете использовать JSON-функции BigQuery для извлечения того, что вам нужно.Смотри здесь .
0 голосов
/ 09 октября 2018

Вот пример реализации решения в GCP Dataflow, которое соответствует первому предложению в принятом ответе .Вам нужно будет выполнить коррекцию json в функции json_processor .Вы можете запустить этот код в блокноте Datalab .

# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()
0 голосов
/ 09 октября 2018

Разбор файла json построчно с помощью встроенного парсера json работать не будет (если, конечно, это не "json lines" документ), поэтому вам нужен потоковый парсер вместо.

Но , хотя это решит проблему использования памяти, оно не исправит неверный json, поэтому лучше всего сначала исправить неверный источник json в виде чистого текстового файла, либо вpython или используя sed или какой-либо подобный инструмент, а затем используйте инкрементальный анализатор для анализа вашего контента.

def fixfile(sourcepath, destpath):
    with open(sourcepath) as source, open(destpath, "w") as dest:
        for line in source:
            # you may want to use a regexp if this simple solution
            # breaks something else
            line = line.replace("$date", "date")
            dest.write(line)
...