Проблема с файлом CSV, когда файл большой - PullRequest
0 голосов
/ 27 декабря 2018

У меня большой файл tsv (~ 2.5Gb).Я перебираю каждую строку, где в строке 6 вкладок.Я беру первую вкладку каждой строки и добавляю строку в CSV-файл, основанный на этой первой вкладке.Цель состоит в том, чтобы закончить файлом, отсортированным по файлам csv, основанным на основных строках tsv.

Это работает с мелким файлом, но когда я запускаю большой файл, консоль IPython никогда не завершает работу.Файл, в который я сохраняю, выглядит так, как будто он заполняется, но при открытии он ничего не отображается.

import csv

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for line in masterfile:
        line_split = line.split("|")
        cik = line_split[0].zfill(10)

        save_path = ".../data-sorted/"
        save_path += cik + ".csv"

        with open(save_path, 'a') as savefile:
            wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
            wr.writerow(line_split)

Ответы [ 2 ]

0 голосов
/ 28 декабря 2018

Предполагая, что у вас достаточно ОЗУ, вам может быть лучше отсортировать файл в памяти, например, в словарь, и одновременно записать на диск.Если ввод-вывод действительно является вашим узким местом, вы должны получить много пробега, если откроете выходной файл только один раз.

from collections import defaultdict
from os.path import join

file_path = ".../master.tsv"

data = collections.defaultdict(list)
with open(file_path, 'r') as masterfile:
    for line in masterfile:
        cik = line.split("|", 1)[0].zfill(10)
        data[cik].append(line)

for cik, lines in data.items():
    save_path = join(".../data-sorted", cik + ".csv")

    with open(save_path, 'w') as savefile:
        wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
        for line in lines:
            wr.writerow(line.split("|"))

Возможно, вам не хватит памяти для загрузки всего файла.В этом случае вы можете свалить его порциями, которые, если они будут достаточно большими, все равно в конечном итоге сэкономят вам значительный объем ввода-вывода.Приведенный ниже метод разбиения на фрагменты очень быстрый и грязный.

from collections import defaultdict
from itertools import groupby
from os.path import join

chunk_size = 10000  # units of lines

file_path = ".../master.tsv"

with open(file_path, 'r') as masterfile:
    for _, chunk in groupby(enumerate(masterfile),
                            key=lambda item: item[0] // chunk_size):
        data = defaultdict(list)
        for line in chunk:
            cik = line.split("|", 1)[0].zfill(10)
            data[cik].append(line)
        for cik, lines in data.items():
            save_path = join(".../data-sorted", cik + ".csv")

            with open(save_path, 'a') as savefile:
                wr = csv.writer(savefile, quoting=csv.QUOTE_ALL)
                for line in lines:
                    wr.writerow(line.split("|"))
0 голосов
/ 27 декабря 2018

Ваш код очень неэффективен в том смысле, что он открывает и добавляет данные для каждой строки / строки входного файла, который он обрабатывает, что будет огромным числом раз, если входной файл настолько велик(потому что вызовы ОС, необходимые для этого, являются относительно медленными).

Плюс, в вашем коде есть как минимум одна ошибка, которую я заметил, а именно строка:

save_path += cik + ".csv"

, которая простопродолжает делать save_path все длиннее и длиннее ... не то, что нужно.

В любом случае, вот что должно работать быстрее, хотя для обработки такого большого файла, скорее всего, потребуется довольно много времени.Это ускоряет процесс, кэшируя промежуточные результаты.Он делает это, только открывая различные выходные CSV-файлы и создавая соответствующие им csv.writer объекты как можно реже, в первый раз, когда они необходимы, и снова только в том случае, если они закрыты, поскольку кэш достиг максимальной длины.

Обратите внимание, что кэш может потреблять много памяти сам в зависимости от того, сколько уникальных выходных файлов CSV существует и сколько их можно открыть одновременно, но использование большого количества памяти - это то, что заставляет его работать быстрее.Вам нужно будет поиграть и вручную отрегулировать значение MAX_OPEN, чтобы найти лучший компромисс между скоростью и использованием памяти, при этом оставаясь ниже предела вашей операционной системы того, сколько файлов разрешено открывать одновременно.

Также обратите внимание, что можно было бы сделать то, что он делает, еще более эффективным, более разумно выбирая, какую существующую запись файла закрыть, а не просто выбирая (открытую) случайную.Однако действительно ли это поможет, зависит от того, есть ли какая-либо выгодная группировка или другой порядок данных во входном файле.

import csv
import os
import random

class CSVWriterCache(dict):
    """ Dict subclass to cache pairs of csv files and associated
        csv.writers. When a specified maximum number of them already
        exist, a random one closed, but an entry for it is retained
        and marked "closed" so it can be re-opened in append mode
        later if it's ever referenced again. This limits the number of
        files open at any given time.
    """
    _CLOSED = None  # Marker to indicate that file has seen before.

    def __init__(self, max_open, **kwargs):
        self.max_open = max_open
        self.cur_open = 0  # Number of currently opened csv files.
        self.csv_kwargs = kwargs  # keyword args for csv.writer.

    # Adding the next two non-dict special methods makes the class a
    # context manager which allows it to be used in "with" statements
    # to do automatic clean-up.
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __getitem__(self, k):
        if k not in self:
            return self.__missing__(k)
        else:
            try:
                csv_writer, csv_file = self.get(k)
            except TypeError:  # Needs to be re-opened in append mode.
                csv_file = open(k, 'a', newline='')
                csv_writer = csv.writer(csv_file, **self.csv_kwargs)

            return csv_writer, csv_file

    def __missing__(self, csv_file_path):
        """ Create a csv.writer corresponding to the file path and add it
            and the file to the cache.
        """
        if self.cur_open == self.max_open:  # Limit?
            # Randomly choose a cached entry with a previously seen
            # file path that is still open (not _CLOSED). The associated
            # file is then closed, but the entry for the file path is
            # left in the dictionary so it can be recognized as having
            # been seen before and be re-opened in append mode.
            while True:
                rand_entry = random.choice(tuple(self.keys()))
                if self[rand_entry] is not self._CLOSED:
                    break
            csv_writer, csv_file = self[rand_entry]
            csv_file.close()
            self.cur_open -= 1
            self[rand_entry] = self._CLOSED  # Mark as previous seen but closed.

        csv_file = open(csv_file_path, 'w', newline='')
        csv_writer = csv.writer(csv_file, **self.csv_kwargs)
        self.cur_open += 1

        # Add pair to cache.
        super().__setitem__(csv_file_path, (csv_writer, csv_file))
        return csv_writer, csv_file

    # Added, non-standard dict method.
    def close(self):
        """ Close all the opened files in the cache and clear it out. """
        for key, entry in self.items():
            if entry is not self._CLOSED:
                entry[1].close()
                self[key] = self._CLOSED  # Not strictly necessary.
                self.cur_open -= 1  # For sanity check at end.
        self.clear()
        assert(self.cur_open == 0)  # Sanity check.

if __name__ == '__main__':
    file_path = "./master.tsv"
    save_path = "./data-sorted"
    MAX_OPEN  = 1000  # Number of opened files allowed (max is OS-dependent).
#    MAX_OPEN  = 2  # Use small value for testing.

    # Create output directory if it does not exist.
    if os.path.exists(save_path):
        if not os.path.isdir(save_path):
            raise RuntimeError("Path {!r} exists, but isn't a directory")
    else:
        print('Creating directory: {!r}'.format(save_path))
        os.makedirs(save_path)

    # Process the input file using a cache of csv.writers.
    with open(file_path, 'r') as masterfile, \
         CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
        for line in masterfile:
            line_split = line.rstrip().split("|")
            cik = line_split[0].zfill(10)

            save_file_path = os.path.join(save_path, cik + ".csv")
            writer = csv_writer_cache[save_file_path][0]
            writer.writerow(line_split)

    print('{!r} file processing completed'.format(os.path.basename(file_path)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...