РЕДАКТИРОВАТЬ: Решение Сначала я разбил мои файлы на 10000 строк, используя функцию split
bash.Затем:
with Pool(processes=32) as pool:
for level in range(75):
all_results=[]
for f in level_dir:
res = pool.apply_async(process_file, args=[f, level]
all_results.append(res)
for res in all_results:
res.get()
save_matrix()
Я работаю с несколькими действительно большими файлами CSV (74 из них, от 10 ГБ до 65 ГБ), и мне нужно читать их построчно, чтобы извлечь из них данные и поместить их в матрицу.
Я использую Python, так как синтаксический анализ файлов CSV с пустыми полями, а также полей JSON не так прост, как использование C / C ++.
Сейчас я делаю то, чтоиспользуйте ThreadPool, но он не выглядит так, как будто он использует процессор на полную мощность (Xeon E5), и я думаю, что это может быть из-за заполнения матрицы.
M = np.zeros((users.size, levels.size, 2))
def process_line(row):
data = json.loads(row[3])
usr = data['usr']
#compute stuff
M[usr, level, 0] = score
M[usr, level, 1] = t_len
def main():
for level in range(75):
csv_f = open("level{}.csv".format(level))
reader = csv.reader(csv_f)
t = ThreadPool(processes=32)
for row in reader:
t.map(process_line, (row, level, ))
t.join()
t.close()
np.save("matrix.npy", M)
Когда я печатаю отметки временипри каждом строковом процессе изменение количества процессов ничего не меняет, оно происходит так же медленно, как и при отсутствии ThreadPool.
Что можно сделать, чтобы мой код работал быстрее?
Если я продолжу так делать, буквально на это уйдет 3 месяца.