У меня есть рабочая станция с 72 ядрами (на самом деле 36 многопоточных ЦП, показанные как 72 ядра multiprocessing.cpu_count()
).
Я пробовал оба multiprocessing
и ray
для одновременной обработки, в пакетах по миллионы маленьких файлов, и я хотел бы записать несколько выходных файлов одновременно во время этой обработки.
Я запутался с блокировкой методов .get()
, связанных, например, с apply_async()
(в multiprocessing
) и ray.get()
.
С ray
у меня есть удаленная функция (process_group()
), которая обрабатывает группы данных параллельно в al oop. Далее в качестве комментариев также указывается версия кода с использованием модуля multiprocessing
.
import ray
import pandas as pd
# from multiprocessing import Pool
ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
##-----------------------
## With ray :
df_list = ray.get([process_group.remote(data) for data in data_list])
##-----------------------
## With multiprocessing :
#f_list = pool.map(process_group, list_of_indices_into_data_list)
##
## data are both known from the parent process
## and I use copy-on-write semantic to avoid having 60 copies.
## All the function needs are a list of indices
## of where to fetch slices of the read-only data.
##
very_big_df = pd.concatenate(df_list)
##-----------------------
## Write to file :
very_big_df.to_parquet(outputfile)
Итак, на каждой итерации l oop я должен собирать выходные данные множества process_group()
, которые были вычислены одновременно, в виде списка фреймов данных df_list
для объединения в один больший very_big_df
датафрейм. Последний должен быть записан на диск (обычно размеры от ~ 1 до ~ 3 ГБ). Запись одного такого файла занимает около 10-30 [s]
, тогда как для обработки пультов process_group
требуется 180 [s]
. Есть тысячи l oop итераций. Таким образом, это займет несколько дней.
Можно ли записать файл на диск неблокирующим образом, в то время как l oop продолжается, чтобы сэкономить около 10% времени ( это сэкономило бы около одного дня вычислений)?
К тому моменту, когда параллельные процессы следующей итерации l oop завершатся, будет достаточно времени для вывода результатов предыдущей итерации. Все задействованные ядра работают на 100%, поэтому модуль Threading
, вероятно, тоже не рекомендуется. multiprocessing.apply_async()
еще более расстраивает, так как не хочет, чтобы мой невыбираемый вывод very_big_df
фрейм данных, которым я должен был бы поделиться с некоторыми более сложными, которые могут стоить мне времени, которое я пытаюсь сохранить и Я надеялся, что ray
справится с чем-то подобным эффективно.
[ОБНОВЛЕНИЕ] Ради простоты я не упомянул, что существует большая общая переменная среди всех процессов (именно поэтому я назвал это параллельный процесс, а также одновременная запись файла). Мой заглавный вопрос был отредактирован в результате. Итак, на самом деле, перед заданиями ray параллельного кода есть немного кода:
shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])
Не уверен, хотя делает ли это его больше похожим на «параллельное» выполнение, а не только на параллельные операции.
[ОБНОВЛЕНИЕ 2] Общий массив является таблицей поиска, то есть только для чтения, если речь идет о параллельных рабочих.
[ОБНОВЛЕНИЕ 3] Я попробовал оба предложенных решения: Threading и Ray / compute (). Для последнего было предложено использовать функцию записи в качестве удаленной и отправлять операцию записи асинхронно в течение для l * 1123. *, который я изначально считал возможным только через .get (), который будет блокироваться.
Таким образом, с Ray это показывает оба решения:
@ray.remote
def write_to_parquet(df_list, filename):
df = pd.concat(df_list)
df.to_parquet(filename, engine='pyarrow', compression=None)
# Share array created outside the loop, read-only (big lookup table).
# About 600 MB
shared_array_id = ray.put(shared_array)
for data_list in many_data_lists:
new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
write_to_parquet.remote(df_list, my_filename)
## Using threading, one would remove the ray decorator:
# write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
# write_thread.start()
Для решения RAY это требовало, однако, чтобы увеличить object_store_memory, по умолчанию было недостаточно: 10% памяти узла ~ 37 ГБ (у меня есть 376 ГБ ОЗУ), который ограничивается 20 ГБ, и единственные объекты хранятся в общей сложности около 22 ГБ: один список фреймов данных df_list
(около 11 ГБ) и результат их объединения в функцию записи (около 11 GB), при условии, что во время объединения имеется копия. Если нет, то эта проблема с памятью не имеет смысла, и мне интересно, смогу ли я пропустить numpy просмотров, которые, как я думал, происходили по умолчанию. Это довольно разочаровывающий аспект RAY, так как я не могу предсказать, какой объем памяти будет у каждого df_list
, он может варьироваться от 1x до 3x ...
В конце концов, придерживаясь multiprocessing
с Потоковая обработка является самым эффективным решением, поскольку обрабатывающая часть (без ввода-вывода) быстрее:
from multiprocessing import Pool
# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)
def process_group(my_data):
# Process a new dataframe here using my_data and some other data inside shared_array
...
return my_df
n_workers = 60
with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
# data_list contains thousands of elements. I choose a chunksize of 10
df_list = pool.map(process_group, data_list, 10)
write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
write_thread.start()
На каждой итерации l oop, обычно len(many_data_lists) = 7000
, и каждый список содержит 7 numpy массивы размера (3, 9092). Таким образом, эти 7000 списков рассылаются 60 рабочим:
время для всех параллельных process_group
на л oop итераций:
RAY: 250 [s]
Многопроцессорная обработка: 233 [s]
Ввод / вывод: для файла паркета объемом 5 ГБ требуется около 35 с быть записанным на внешнем вращающемся диске USB 3. Около 10 секунд на внутреннем вращающемся диске.
Луч : ~ 5 с накладными расходами на создание будущего с write_to_parquet.remote()
, который блокирует l oop. Это все еще 50% времени, которое потребуется для записи на вращающийся диск. Это не идеально.
многопроцессорная обработка : измерено служебное значение 0 с.
общее время стены:
Луч : 486 [s]
Многопроцессорная обработка : 436 [s]
Я повторил это несколько раз, различия между Ray и Многопроцессорная обработка постоянно показывают Многопроцессорность быстрее на ~ 50 с , Это существенная разница, также озадачивающая, поскольку Ray рекламирует более высокую эффективность.
Я выполню это для более длительного числа итераций и сообщу о стабильности (память, потенциальные проблемы со сбором мусора, ...)