Ускорьте процедуру внутри процесса с высоким использованием памяти, используя параллельную обработку в Python - PullRequest
0 голосов
/ 04 сентября 2018

У меня есть программа, которая использует много памяти (150 ГБ векторов, проиндексированных с помощью nmslib), и у меня возникают проблемы с распараллеливанием выполнения кода. Моя машина имеет 40 ядер, и мои попытки распараллелить ее пока не увенчались успехом. программа сначала загружает векторы, а затем подготавливает некоторые данные на основе векторов (эта часть хороша, а производительность хороша, потому что большая часть рабочей нагрузки выполняется nmslib, который сам по себе является многопоточным). Теперь проблема начинается, когда я постобработаю данные, которые были загружены в ОЗУ с помощью nmslib. Я перебираю список из 500 записей, каждая из которых представляет данные одного файла. Код, который я использую для обработки этих данных и который я пытаюсь выполнить параллельно, представляет собой следующую процедуру:

def tib_result_turn_to_file(data):
fileindex = data[0]
main_bucket = data[1]
result_string = ""
print("Now processing: " + fileindex[0])
print(abs(fileindex[1]-fileindex[2]))
#print(len(results))
c = fileindex[1]
c1 = 0
while c < fileindex[2]:
    if main_bucket == "tengyur1":
        tibwords = tibwords_tengyur1
    if main_bucket == "tengyur2":
        tibwords = tibwords_tengyur2
    if main_bucket == "kangyur":
        tibwords = tibwords_kangyur
    result_string += "\n" + main_bucket + "#" + fileindex[0] + " " + str(c)
    for result in data[2][c1]:
        bucket = result[2]
        if bucket == "tengyur1":
            tibwords = tibwords_tengyur1
        if bucket == "tengyur2":
            tibwords = tibwords_tengyur2
        if bucket == "kangyur":
            tibwords = tibwords_kangyur
        bucket = result[2]
        result_position = result[1]
        result_score = result[0]
        # we don't want results that score too low or that are identical with the source:
        if result_score < 0.03 and (result_position < c- 20 or result_position > c + 20):
                result_string += "\t"  + bucket + "#" + tibwords[result_position][0] + "#" + str(result_position)
    c += 1
    c1 += 1
with open("/mnt/output_parallel/" + fileindex[0][:-4] + "_parallel_index.org", "w") as text_file:
    text_file.write(result_string)

списки, начинающиеся с тибвордов, - это огромные списки из каждых 50 миллионов записей. Они определены в процедуре воспитания и не изменяются в этой программе, поэтому я предполагаю, что они не будут скопированы. Теперь каждая партия данных, которая вводится в эту процедуру, сама по себе не маленькая, если я расскажу, что в среднем получу около 500 МБ. Поскольку единственной целью этой подпрограммы является создание побочного эффекта путем записи файла в конце его выполнения, и поскольку он не изменяет никакие данные, которые могут быть переданы другим потокам, я предполагаю, что распараллелить его должно быть довольно просто , Однако пока ничего не получалось. Что я пробовал:

Parallel(n_jobs=40,backend="threading")(delayed(tib_result_turn_to_file)(i,bucket) for i in files)

Это, кажется, создает много потоков, но они, кажется, не делают много. Я думаю, что GIL мешает, в лучшем случае используется одно ядро.

Parallel(n_jobs=40)(delayed(tib_result_turn_to_file)(i,bucket) for i in files)

Это сломается, потому что он жалуется на использование памяти. Если я добавлю параметр require = 'sharedmem', он запустится, но тогда он будет таким же медленным, как и предыдущая попытка. Третье решение:

pool = multiprocessing.Pool(processes=4)
pool.map(tib_result_turn_to_file,files,bucket)
pool.close()

Ошибка с OOM. Я не понимаю почему, хотя. Все данные, к которым осуществляется доступ внутри подпрограммы, доступны только для чтения, и даже если я уменьшу подпрограмму до:

def tib_result_turn_to_file(data):
    print("Hello world")

Пул не будет работать с OOM. Это потому, что я загружаю огромные индикаторы в предыдущем разделе программы (которые в настоящее время все еще находятся в памяти, но больше не используются)? Если это должно быть причиной, есть ли какой-нибудь возможный способ обойти эту проблему? Правильный ли мой подход в конце концов? Интересно, стоит ли мне разбить эту программу на две части, сначала выполнить векторные операции с nmslib, а на втором этапе - постобработку, но, по моему мнению, это добавит много нежелательных сложностей.

1 Ответ

0 голосов
/ 05 сентября 2018

Решение действительно состоит в том, что мне пришлось удалить все ранее созданные индексы nmslib, прежде чем я смог распараллелить дальнейшее выполнение подпрограммы, даже если эти переменные не используются внутри новых процессов. Похоже, что Python не может помочь, но скопировать все в новые процессы.

...