Ваш код очень неэффективен в том смысле, что он открывает и добавляет данные для каждой строки / строки входного файла, который он обрабатывает, что будет огромным числом раз, если входной файл настолько велик(потому что вызовы ОС, необходимые для этого, являются относительно медленными).
Плюс, в вашем коде есть как минимум одна ошибка, которую я заметил, а именно строка:
save_path += cik + ".csv"
, которая простопродолжает делать save_path
все длиннее и длиннее ... не то, что нужно.
В любом случае, вот что должно работать быстрее, хотя для обработки такого большого файла, скорее всего, потребуется довольно много времени.Это ускоряет процесс, кэшируя промежуточные результаты.Он делает это, только открывая различные выходные CSV-файлы и создавая соответствующие им csv.writer
объекты как можно реже, в первый раз, когда они необходимы, и снова только в том случае, если они закрыты, поскольку кэш достиг максимальной длины.
Обратите внимание, что кэш может потреблять много памяти сам в зависимости от того, сколько уникальных выходных файлов CSV существует и сколько их можно открыть одновременно, но использование большого количества памяти - это то, что заставляет его работать быстрее.Вам нужно будет поиграть и вручную отрегулировать значение MAX_OPEN
, чтобы найти лучший компромисс между скоростью и использованием памяти, при этом оставаясь ниже предела вашей операционной системы того, сколько файлов разрешено открывать одновременно.
Также обратите внимание, что можно было бы сделать то, что он делает, еще более эффективным, более разумно выбирая, какую существующую запись файла закрыть, а не просто выбирая (открытую) случайную.Однако действительно ли это поможет, зависит от того, есть ли какая-либо выгодная группировка или другой порядок данных во входном файле.
import csv
import os
import random
class CSVWriterCache(dict):
""" Dict subclass to cache pairs of csv files and associated
csv.writers. When a specified maximum number of them already
exist, a random one closed, but an entry for it is retained
and marked "closed" so it can be re-opened in append mode
later if it's ever referenced again. This limits the number of
files open at any given time.
"""
_CLOSED = None # Marker to indicate that file has seen before.
def __init__(self, max_open, **kwargs):
self.max_open = max_open
self.cur_open = 0 # Number of currently opened csv files.
self.csv_kwargs = kwargs # keyword args for csv.writer.
# Adding the next two non-dict special methods makes the class a
# context manager which allows it to be used in "with" statements
# to do automatic clean-up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getitem__(self, k):
if k not in self:
return self.__missing__(k)
else:
try:
csv_writer, csv_file = self.get(k)
except TypeError: # Needs to be re-opened in append mode.
csv_file = open(k, 'a', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
return csv_writer, csv_file
def __missing__(self, csv_file_path):
""" Create a csv.writer corresponding to the file path and add it
and the file to the cache.
"""
if self.cur_open == self.max_open: # Limit?
# Randomly choose a cached entry with a previously seen
# file path that is still open (not _CLOSED). The associated
# file is then closed, but the entry for the file path is
# left in the dictionary so it can be recognized as having
# been seen before and be re-opened in append mode.
while True:
rand_entry = random.choice(tuple(self.keys()))
if self[rand_entry] is not self._CLOSED:
break
csv_writer, csv_file = self[rand_entry]
csv_file.close()
self.cur_open -= 1
self[rand_entry] = self._CLOSED # Mark as previous seen but closed.
csv_file = open(csv_file_path, 'w', newline='')
csv_writer = csv.writer(csv_file, **self.csv_kwargs)
self.cur_open += 1
# Add pair to cache.
super().__setitem__(csv_file_path, (csv_writer, csv_file))
return csv_writer, csv_file
# Added, non-standard dict method.
def close(self):
""" Close all the opened files in the cache and clear it out. """
for key, entry in self.items():
if entry is not self._CLOSED:
entry[1].close()
self[key] = self._CLOSED # Not strictly necessary.
self.cur_open -= 1 # For sanity check at end.
self.clear()
assert(self.cur_open == 0) # Sanity check.
if __name__ == '__main__':
file_path = "./master.tsv"
save_path = "./data-sorted"
MAX_OPEN = 1000 # Number of opened files allowed (max is OS-dependent).
# MAX_OPEN = 2 # Use small value for testing.
# Create output directory if it does not exist.
if os.path.exists(save_path):
if not os.path.isdir(save_path):
raise RuntimeError("Path {!r} exists, but isn't a directory")
else:
print('Creating directory: {!r}'.format(save_path))
os.makedirs(save_path)
# Process the input file using a cache of csv.writers.
with open(file_path, 'r') as masterfile, \
CSVWriterCache(MAX_OPEN, quoting=csv.QUOTE_ALL) as csv_writer_cache:
for line in masterfile:
line_split = line.rstrip().split("|")
cik = line_split[0].zfill(10)
save_file_path = os.path.join(save_path, cik + ".csv")
writer = csv_writer_cache[save_file_path][0]
writer.writerow(line_split)
print('{!r} file processing completed'.format(os.path.basename(file_path)))