Чтение n строк из csv в Google Cloud Storage для использования с модулем Python csv - PullRequest
0 голосов
/ 09 июля 2019

У меня есть множество очень больших (~ 4 ГБ каждый) CSV-файлов, которые содержат разные форматы.Они поступают от регистраторов данных более 10 различных производителей.Я пытаюсь объединить все это в BigQuery.Чтобы загружать их ежедневно, я хочу сначала загрузить эти файлы в облачное хранилище, определить схему, а затем загрузить в BigQuery.В связи с тем, что некоторые файлы содержат дополнительную информацию заголовка (от 2 до 30 строк), я создал свои собственные функции для определения наиболее вероятной строки заголовка и схемы из выборки каждого файла (~ 100 строк), котораяЗатем я могу использовать в job_config при загрузке файлов в BQ.

Это прекрасно работает, когда я работаю с файлами из локального хранилища напрямую в BQ, поскольку я могу использовать менеджер контекста и затем модуль Python csv, в частностиСниффер и ридер объектов.Однако, похоже, не существует эквивалентного метода использования диспетчера контекста непосредственно из хранилища.Я не хочу обходить Cloud Storage в случае, если какой-либо из этих файлов будет прерван при загрузке в BQ.

Что я могу заставить работать:

# initialise variables
with open(csv_file, newline  = '', encoding=encoding) as datafile:
    dialect = csv.Sniffer().sniff(datafile.read(chunk_size))
    reader = csv.reader(datafile, dialect)
    sample_rows = []
    row_num  = 0
    for row in reader:
         sample_rows.append(row)
         row_num+=1
         if (row_num >100):
             break
    sample_rows
# Carry out schema  and header investigation...

С Google Cloud Storage я попыталсяиспользовать download_as_string и download_to_file, которые предоставляют двоичные представления объектов данных, но тогда я не могу заставить модуль csv работать с любыми данными.Я попытался использовать .decode ('utf-8'), и он возвращает длинную строку с \ r \ n.Затем я использовал splitlines (), чтобы получить список данных, но функции csv по-прежнему предоставляют диалект и читатель, который разбивает данные на отдельные символы при каждой записи.

Кто-нибудь успел обойти эту проблему?использовать модуль csv с файлами, хранящимися в облачном хранилище, не загружая весь файл?

1 Ответ

1 голос
/ 11 июля 2019

После просмотра исходного кода csv на GitHub мне удалось использовать модуль io и модуль csv в Python для решения этой проблемы. Io.BytesIO и TextIOWrapper были две ключевые функции для использования. Возможно, это не обычный случай использования, но я решил опубликовать ответ здесь, чтобы сэкономить время для тех, кто в нем нуждается.

# Set up storage client and create a blob object from csv file that you are trying to read from GCS.
content = blob.download_as_string(start = 0, end = 10240) # Read a chunk of bytes that will include all header data and the recorded data itself.
bytes_buffer = io.BytesIO(content)
wrapped_text = io.TextIOWrapper(bytes_buffer, encoding = encoding, newline =  newline)
dialect = csv.Sniffer().sniff(wrapped_text.read()) 
wrapped_text.seek(0)
reader = csv.reader(wrapped_text, dialect)
# Do what you will with the reader object
...