Запись файлов одновременно с другими задачами, связанными с процессором, с помощью многопроцессорной обработки или луча - PullRequest
0 голосов
/ 13 февраля 2020

У меня есть рабочая станция с 72 ядрами (на самом деле 36 многопоточных ЦП, показанные как 72 ядра multiprocessing.cpu_count()).

Я пробовал оба multiprocessing и ray для одновременной обработки, в пакетах по миллионы маленьких файлов, и я хотел бы записать несколько выходных файлов одновременно во время этой обработки.

Я запутался с блокировкой методов .get(), связанных, например, с apply_async()multiprocessing) и ray.get().

С ray у меня есть удаленная функция (process_group()), которая обрабатывает группы данных параллельно в al oop. Далее в качестве комментариев также указывается версия кода с использованием модуля multiprocessing.

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :
   very_big_df.to_parquet(outputfile)

Итак, на каждой итерации l oop я должен собирать выходные данные множества process_group(), которые были вычислены одновременно, в виде списка фреймов данных df_list для объединения в один больший very_big_df датафрейм. Последний должен быть записан на диск (обычно размеры от ~ 1 до ~ 3 ГБ). Запись одного такого файла занимает около 10-30 [s], тогда как для обработки пультов process_group требуется 180 [s]. Есть тысячи l oop итераций. Таким образом, это займет несколько дней.

Можно ли записать файл на диск неблокирующим образом, в то время как l oop продолжается, чтобы сэкономить около 10% времени ( это сэкономило бы около одного дня вычислений)?

К тому моменту, когда параллельные процессы следующей итерации l oop завершатся, будет достаточно времени для вывода результатов предыдущей итерации. Все задействованные ядра работают на 100%, поэтому модуль Threading, вероятно, тоже не рекомендуется. multiprocessing.apply_async() еще более расстраивает, так как не хочет, чтобы мой невыбираемый вывод very_big_df фрейм данных, которым я должен был бы поделиться с некоторыми более сложными, которые могут стоить мне времени, которое я пытаюсь сохранить и Я надеялся, что ray справится с чем-то подобным эффективно.

[ОБНОВЛЕНИЕ] Ради простоты я не упомянул, что существует большая общая переменная среди всех процессов (именно поэтому я назвал это параллельный процесс, а также одновременная запись файла). Мой заглавный вопрос был отредактирован в результате. Итак, на самом деле, перед заданиями ray параллельного кода есть немного кода:

shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])

Не уверен, хотя делает ли это его больше похожим на «параллельное» выполнение, а не только на параллельные операции.

[ОБНОВЛЕНИЕ 2] Общий массив является таблицей поиска, то есть только для чтения, если речь идет о параллельных рабочих.

[ОБНОВЛЕНИЕ 3] Я попробовал оба предложенных решения: Threading и Ray / compute (). Для последнего было предложено использовать функцию записи в качестве удаленной и отправлять операцию записи асинхронно в течение для l * 1123. *, который я изначально считал возможным только через .get (), который будет блокироваться.

Таким образом, с Ray это показывает оба решения:

@ray.remote
def write_to_parquet(df_list, filename):
    df = pd.concat(df_list)
    df.to_parquet(filename, engine='pyarrow', compression=None)

# Share array created outside the loop, read-only (big lookup table). 
# About 600 MB
shared_array_id = ray.put(shared_array)

for data_list in many_data_lists:

   new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
   write_to_parquet.remote(df_list, my_filename)

   ## Using threading, one would remove the ray decorator:
   # write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
   # write_thread.start()

Для решения RAY это требовало, однако, чтобы увеличить object_store_memory, по умолчанию было недостаточно: 10% памяти узла ~ 37 ГБ (у меня есть 376 ГБ ОЗУ), который ограничивается 20 ГБ, и единственные объекты хранятся в общей сложности около 22 ГБ: один список фреймов данных df_list (около 11 ГБ) и результат их объединения в функцию записи (около 11 GB), при условии, что во время объединения имеется копия. Если нет, то эта проблема с памятью не имеет смысла, и мне интересно, смогу ли я пропустить numpy просмотров, которые, как я думал, происходили по умолчанию. Это довольно разочаровывающий аспект RAY, так как я не могу предсказать, какой объем памяти будет у каждого df_list, он может варьироваться от 1x до 3x ...

В конце концов, придерживаясь multiprocessing с Потоковая обработка является самым эффективным решением, поскольку обрабатывающая часть (без ввода-вывода) быстрее:

from multiprocessing import Pool

# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)

def process_group(my_data):
   # Process a new dataframe here using my_data and some other data inside shared_array
   ...
   return my_df


n_workers = 60
with Pool(processes=n_workers) as pool:
   for data_list in many_data_lists:
      # data_list contains thousands of elements. I choose a chunksize of 10
      df_list = pool.map(process_group, data_list, 10)
      write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
            write_thread.start()

На каждой итерации l oop, обычно len(many_data_lists) = 7000, и каждый список содержит 7 numpy массивы размера (3, 9092). Таким образом, эти 7000 списков рассылаются 60 рабочим:

время для всех параллельных process_group на л oop итераций:

RAY: 250 [s]

Многопроцессорная обработка: 233 [s]

Ввод / вывод: для файла паркета объемом 5 ГБ требуется около 35 с быть записанным на внешнем вращающемся диске USB 3. Около 10 секунд на внутреннем вращающемся диске.

Луч : ~ 5 с накладными расходами на создание будущего с write_to_parquet.remote(), который блокирует l oop. Это все еще 50% времени, которое потребуется для записи на вращающийся диск. Это не идеально.

многопроцессорная обработка : измерено служебное значение 0 с.

общее время стены:

Луч : 486 [s]

Многопроцессорная обработка : 436 [s]

Я повторил это несколько раз, различия между Ray и Многопроцессорная обработка постоянно показывают Многопроцессорность быстрее на ~ 50 с , Это существенная разница, также озадачивающая, поскольку Ray рекламирует более высокую эффективность.

Я выполню это для более длительного числа итераций и сообщу о стабильности (память, потенциальные проблемы со сбором мусора, ...)

1 Ответ

1 голос
/ 13 февраля 2020

Рассматривали ли вы назначение 1 ядра для задачи луча, которая записывает данные в файл?

[ОБНОВЛЕНИЕ] Прототип

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)

@ray.remote
def write_to_parquet(data, filename):
    # write it until succeed.
    # record failed write somewhere. 
    # I assume failure to write is uncommon. You can probably just 
    # write ray.put() and have one background process that keeps failed 
    # write again.

# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :

   write_to_parquet.remote(very_big_df, filename)
...