Многопроцессорный пул Pythons не работает параллельно - PullRequest
0 голосов
/ 18 марта 2019

В настоящее время я пытаюсь вычислить ближайших соседей для тысяч векторов и миллионов векторов с 50 измерениями в python.Позволить запустить его последовательно кажется огромной тратой времени, поэтому я хотел распараллелить его.

Сначала я загружаю предварительно обработанные данные с помощью pickle, где у меня есть дикт с моими объектами p в качестве ключей и их векторной формой (p2v) в качестве значения, а также некоторые идентификаторы, которые определяют, какие ps будут просматриваться.Затем я делю идентификаторы на 8 частей, так что каждый процесс должен вычислить соседей для нескольких p.Метод find_closest_n_chunks принимает массив идентификаторов и ищет в p2v dict вектор.Затем он проходит весь диктат, чтобы вычислить ближайшего соседа (для моего теста я смотрю только 10 элементов из дикта).

В настоящее время я работаю на компьютере с Windows со следующим кодом:

import time
import multiprocessing as mp
import pickle

def find_closest_n_chunks(ids, p2v, n):
    print("starting a chunk")
    neighbors = []
    for p in ids:
        p2distance = {}
        vector = p2v[p]
        for key in list(p2v.keys())[:10]:
            distance = cosine(vector, p2v[key])
            p2distance[key] = distance
        sorted_by_distance = sorted(p2distance.items(), key=lambda kv: kv[1])
        neighbors.append(sorted_by_distance[:n])
    print("finished chunk")
    return neighbors

if __name__ == '__main__':
    file = open('./prelim_results/DataHandlerC08.obj', "rb")
    handler = pickle.load(file)
    file = open("./prelim_results/vectorsC08.obj", "rb")
    p2v = pickle.load(file)
    ids = handler.evaluation_ids

    chunks = []
    chunk_p2v = []
    for _ in range(8):
        chunks.append([])
        chunk_p2v.append(p2v.copy())
    for idx, p in enumerate(ids):
        array_idx = idx % 8
        chunks[array_idx].append(p)
    print("starting")
    start = time.time()
    with mp.Pool(processes=8, maxtasksperchild=1) as pool:
        results = []
        for idx, chunk in enumerate(chunks):
            results.append(pool.apply_async(find_closest_n_chunks, (chunk, chunk_p2v[idx], 100)))
        for result in results:
            step = time.time()
            print("step: {0}".format(step-start))
            result.wait()
    end = time.time()
    print(end - start)

Это прекрасно работает при запуске нескольких различных процессов, однако они не работают параллельно на моем 4-ядерном ядре (8с многопоточностью) CPU.Выходные данные подтверждают это:

starting
step: 0.1266627311706543
starting a chunk
finished chunk
step: 15.884509086608887
starting a chunk
finished chunk
step: 24.54252290725708
starting a chunk
finished chunk
step: 33.269429445266724
starting a chunk
finished chunk
step: 42.065810680389404

Сообщение «запуск чанка» вызывается только после «завершения чанка», и каждый чанк занимает около 8-9 секунд, что означает, что каждый чанк будет запускаться только послеДругое закончено.

Обратите внимание, я уже пытался скопировать объект p2v, чтобы убедиться, что у меня нет общего объекта между задачами (как и ожидалось, поскольку он доступен только для чтения, это ничего не изменило).

Можете ли выуказать на любую ошибку, которую я сделал, пытаясь распараллелить ее?

Редактировать: добавлен импорт в начало кода и операторы pickle, которые загружают мои векторы и идентификаторы

Редактировать2: я поменял местами find_closest_n_chunk ()с использованием фиктивного метода:

def dummy():
    print("start dummy")
    time.sleep(5)
    print("end dummy")

Это дало мне правильный вывод, где весь процесс будет запущен до того, как будет завершен первый.Затем я добавил time.sleep (20) к фактическому методу find_closest сразу после начальной строки печати.Это показало, что есть некоторое распараллеливание:

starting
step: 0.07980728149414062
starting a chunk
starting a chunk
starting a chunk
starting a chunk
finished chunk
step: 35.71958327293396
starting a chunk
finished chunk
step: 45.673808574676514
finished chunk
starting a chunk
finished chunk
step: 55.73376154899597
step: 55.73949694633484
starting a chunk
finished chunk
step: 64.74968791007996
starting a chunk
finished chunk
step: 73.77850985527039
finished chunk
step: 82.86862134933472
finished chunk
91.36373448371887

Но для появления каждого "запускающего чанка" требуется 5-10 секунд.Поэтому они не запускаются одновременно

...