У меня есть скрипт, который использует многопроцессорность, чтобы открывать и выполнять вычисления для ~ 200k .csv файлов. Вот рабочий процесс:
1) Рассматривая папку с ~ 200k .csv файлами. Каждый файл .csv содержит следующий пример:
.csv файл:
0, 1
2, 3
4, 5
...
~500 rows
2) Сценарий сохраняет список всех файлов .csv в list()
3) Сценарий делит список с ~ 200k .csv файлами на 8 списков, поскольку у меня доступно 8 процессоров.
4) Сценарий вызывает do_something_with_csv()
8 раз и выполняет вычисления параллельно.
В линейном режиме выполнение занимает около 4 минут.
Параллельно и последовательно, если я выполняю сценарий в первый раз, это занимает намного больше времени. Если я выполню для второго, третьего et c. Время занимает около 1 мин. Похоже, python кэширует операции ввода-вывода в некотором роде? Похоже, потому что у меня есть индикатор выполнения, и, например, если я выполню, пока индикатор выполнения не станет 5k / 200k, и не завершу программу, следующее выполнение будет go, первые 5k будут выполняться очень быстро, а затем замедляться.
Python версия: 3.6.1
Псевдо Python код:
def multiproc_dispatch():
lst_of_all_csv_files = get_list_of_files('/path_to_csv_files')
divided_lst_of_all_csv_files = split_list_chunks(lst_of_all_csv_files, 8)
manager = Manager()
shared_dict = manager.dict()
jobs = []
for lst_of_all_csv_files in divided_lst_of_all_csv_files:
p = Process(target=do_something_with_csv, args=(shared_dict, lst_of_all_csv_files))
jobs.append(p)
p.start()
# Wait for the worker to finish
for job in jobs:
job.join()
def read_csv_file(csv_file):
lst_a = []
lst_b = []
with open(csv_file, 'r') as f_read:
csv_reader = csv.reader(f_read, delimiter = ',')
for row in csv_reader:
lst_a.append(float(row[0]))
lst_b.append(float(row[1]))
return lst_a, lst_b
def do_something_with_csv(shared_dict, lst_of_all_csv_files):
temp_dict = lambda: defaultdict(self.mydict)()
for csv_file in lst_of_all_csv_files:
lst_a, lst_b = read_csv_file(csv_file)
temp_dict[csv_file] = (lst_a, lst_b)
shared_dict.update(temp_dict)
if __name__ == '__main__':
multiproc_dispatch()