В настоящее время я пытаюсь вычислить ближайших соседей для тысяч векторов и миллионов векторов с 50 измерениями в python.Позволить запустить его последовательно кажется огромной тратой времени, поэтому я хотел распараллелить его.
Сначала я загружаю предварительно обработанные данные с помощью pickle, где у меня есть дикт с моими объектами p в качестве ключей и их векторной формой (p2v) в качестве значения, а также некоторые идентификаторы, которые определяют, какие ps будут просматриваться.Затем я делю идентификаторы на 8 частей, так что каждый процесс должен вычислить соседей для нескольких p.Метод find_closest_n_chunks принимает массив идентификаторов и ищет в p2v dict вектор.Затем он проходит весь диктат, чтобы вычислить ближайшего соседа (для моего теста я смотрю только 10 элементов из дикта).
В настоящее время я работаю на компьютере с Windows со следующим кодом:
import time
import multiprocessing as mp
import pickle
def find_closest_n_chunks(ids, p2v, n):
print("starting a chunk")
neighbors = []
for p in ids:
p2distance = {}
vector = p2v[p]
for key in list(p2v.keys())[:10]:
distance = cosine(vector, p2v[key])
p2distance[key] = distance
sorted_by_distance = sorted(p2distance.items(), key=lambda kv: kv[1])
neighbors.append(sorted_by_distance[:n])
print("finished chunk")
return neighbors
if __name__ == '__main__':
file = open('./prelim_results/DataHandlerC08.obj', "rb")
handler = pickle.load(file)
file = open("./prelim_results/vectorsC08.obj", "rb")
p2v = pickle.load(file)
ids = handler.evaluation_ids
chunks = []
chunk_p2v = []
for _ in range(8):
chunks.append([])
chunk_p2v.append(p2v.copy())
for idx, p in enumerate(ids):
array_idx = idx % 8
chunks[array_idx].append(p)
print("starting")
start = time.time()
with mp.Pool(processes=8, maxtasksperchild=1) as pool:
results = []
for idx, chunk in enumerate(chunks):
results.append(pool.apply_async(find_closest_n_chunks, (chunk, chunk_p2v[idx], 100)))
for result in results:
step = time.time()
print("step: {0}".format(step-start))
result.wait()
end = time.time()
print(end - start)
Это прекрасно работает при запуске нескольких различных процессов, однако они не работают параллельно на моем 4-ядерном ядре (8с многопоточностью) CPU.Выходные данные подтверждают это:
starting
step: 0.1266627311706543
starting a chunk
finished chunk
step: 15.884509086608887
starting a chunk
finished chunk
step: 24.54252290725708
starting a chunk
finished chunk
step: 33.269429445266724
starting a chunk
finished chunk
step: 42.065810680389404
Сообщение «запуск чанка» вызывается только после «завершения чанка», и каждый чанк занимает около 8-9 секунд, что означает, что каждый чанк будет запускаться только послеДругое закончено.
Обратите внимание, я уже пытался скопировать объект p2v, чтобы убедиться, что у меня нет общего объекта между задачами (как и ожидалось, поскольку он доступен только для чтения, это ничего не изменило).
Можете ли выуказать на любую ошибку, которую я сделал, пытаясь распараллелить ее?
Редактировать: добавлен импорт в начало кода и операторы pickle, которые загружают мои векторы и идентификаторы
Редактировать2: я поменял местами find_closest_n_chunk ()с использованием фиктивного метода:
def dummy():
print("start dummy")
time.sleep(5)
print("end dummy")
Это дало мне правильный вывод, где весь процесс будет запущен до того, как будет завершен первый.Затем я добавил time.sleep (20) к фактическому методу find_closest сразу после начальной строки печати.Это показало, что есть некоторое распараллеливание:
starting
step: 0.07980728149414062
starting a chunk
starting a chunk
starting a chunk
starting a chunk
finished chunk
step: 35.71958327293396
starting a chunk
finished chunk
step: 45.673808574676514
finished chunk
starting a chunk
finished chunk
step: 55.73376154899597
step: 55.73949694633484
starting a chunk
finished chunk
step: 64.74968791007996
starting a chunk
finished chunk
step: 73.77850985527039
finished chunk
step: 82.86862134933472
finished chunk
91.36373448371887
Но для появления каждого "запускающего чанка" требуется 5-10 секунд.Поэтому они не запускаются одновременно