Таким образом, решение в конечном итоге стало распараллеливанием, но я не смог выяснить это с помощью специфических библиотек Panda для распараллеливания, так как предполагаемый результат был не преобразованием существующего содержимого ячейки, а новым значением, полученным из другого фрейма данных.
Я взял библиотеку joblib и предпринял следующие шаги:
Во-первых, я создал функцию, которая, учитывая два идентификатора, могла бы возвращать строку для этого индекса (поскольку отдельные работники не могут изменять структуру данных в основном процессе, вместо этого мы должны перейти к парадигме генерации всех данных сначала ТОГДА строит фрейм данных):
def get_distance(df, id1, id2):
return [id1, id2, distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])]
и примененное к нему распараллеливание JobLib:
def get_distances(df):
indexes = df.index
total_combinations = combination_count(len(indexes), 2)
current_combination = 0
print('There are %d possible inter-message relationships to compute' % total_combinations)
data = Parallel(n_jobs=-1)(delayed(get_distance)(df, min(ids), max(ids)) for ids in combinations(indexes, 2))
distances = pd.DataFrame(data, columns=[id_column_1, id_column_2, distance_column])
distances.set_index([id_column_1, id_column_2], inplace=True)
return distances
Это дало немедленное улучшение от месяцев к дням ожидаемого времени, но я подозревал, что пропуск полного кадра данных будет создавать значительные накладные расходы.
После изменения функции для передачи только требуемых значений было достигнуто другое немедленное улучшение до менее чем дня (~ 20 часов):
def get_distance(id1, id2, embed1, embed2):
return [id1, id2, distance.euclidean(embed1, embed2)]
# ...later, in get_distances()...
data = Parallel(n_jobs=-1)(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in combinations(indexes, 2))
Наконец, основываясь на документах joblib и том факте, что значительный объем данных все еще передается работникам, я переключился на многопроцессорную серверную часть и увидел, что ожидаемое время сократилось до ~ 1,5 часа. , (Я также добавил библиотеку tqdm, чтобы получить лучшее представление о прогрессе, чем то, что предоставляет joblib)
data = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in tqdm(combinations(indexes, 2), total=total_combinations))
Надеюсь, это поможет кому-то еще в их первом набеге на распараллеливание Python!