Распараллелить / векторизовать вычисления комбинаций из Pandas Dataframe - PullRequest
0 голосов
/ 04 апреля 2019

У меня есть несколько фреймов данных маринованных панд с приличным количеством строк в каждом (~ 10k). Один из столбцов информационного кадра - это небольшой массив с плавающей запятой (Да, я специально решил хранить данные массива в одной ячейке - я читал, что это обычно не правильный путь, например. здесь , но в этом случае отдельные значения не имеют смысла, значение имеет только полный список значений, поэтому я думаю, что в этом случае это имеет смысл). Мне нужно рассчитать евклидово расстояние между каждой парой строк в кадре. У меня есть рабочий код для этого, но я надеюсь, что смогу что-то сделать, чтобы улучшить его производительность, так как сейчас он говорит мне, что мой меньший набор данных займет> месяц, но я уверен, что это займет вся моя память задолго до этого.

Код выглядит следующим образом:

import pandas as pd
import sys
import getopt
import math
from scipy.spatial import distance
from timeit import default_timer as timer
from datetime import timedelta

id_column_1 = 'id1'
id_column_2 = 'id2'
distance_column = 'distance'
val_column = 'val'

# where n is the size of the set
# and k is the number of elements per combination
def combination_count(n, k):
    if k > n:
        return 0
    else:
        # n! / (k! * (n - k)!)
        return math.factorial(n)/(math.factorial(k) * math.factorial(n - k))

def progress(start, current, total, id1, id2):
    if current == 0:
        print('Processing combination #%d of #%d, (%d, %d)' % (current, total, id1, id2))
    else:
        percent_complete = 100 * float(current)/float(total)
        elapsed_time = timer() - start
        avg_time = elapsed_time / current
        remaining = total - current
        remaining_time = timedelta(seconds=remaining * avg_time)
        print('Processing combination #%d of #%d, (%d, %d). %.2f%% complete, ~%.2f s/combination, ~%s remaining' % (current, total, id1, id2, percent_complete, avg_time, remaining_time))

def check_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    distances = pd.DataFrame(columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    start = timer()
    for id1 in indexes:
        for id2 in indexes:
            # id1 is always < id2
            if id1 >= id2:
                continue
            progress(start, current_combination, total_combinations, id1, id2)
            distances.loc[(id1, id2), distance_column] = distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])
            current_combination+=1

(я исключил функцию main (), которая просто извлекает аргументы и загружает в них файлы маринованных файлов)

Я только недавно начал работать с Python для этой задачи, так что есть все шансы, что я упускаю что-то простое, есть ли хороший способ справиться с этим?

Ответы [ 2 ]

1 голос
/ 04 апреля 2019

Есть несколько вариантов параллельного вычисления фреймов данных в чистом питоне.
Наиболее полным может быть dask
Более простым, но более простым вариантом будет pandaral-lel

0 голосов
/ 04 апреля 2019

Таким образом, решение в конечном итоге стало распараллеливанием, но я не смог выяснить это с помощью специфических библиотек Panda для распараллеливания, так как предполагаемый результат был не преобразованием существующего содержимого ячейки, а новым значением, полученным из другого фрейма данных.

Я взял библиотеку joblib и предпринял следующие шаги:

Во-первых, я создал функцию, которая, учитывая два идентификатора, могла бы возвращать строку для этого индекса (поскольку отдельные работники не могут изменять структуру данных в основном процессе, вместо этого мы должны перейти к парадигме генерации всех данных сначала ТОГДА строит фрейм данных):

def get_distance(df, id1, id2):
    return [id1, id2, distance.euclidean(df.loc[id1, embeddings_column], df.loc[id2, embeddings_column])]

и примененное к нему распараллеливание JobLib:

def get_distances(df):
    indexes = df.index
    total_combinations = combination_count(len(indexes), 2)
    current_combination = 0
    print('There are %d possible inter-message relationships to compute' % total_combinations)
    data = Parallel(n_jobs=-1)(delayed(get_distance)(df, min(ids), max(ids)) for ids in combinations(indexes, 2))
    distances = pd.DataFrame(data, columns=[id_column_1, id_column_2, distance_column])
    distances.set_index([id_column_1, id_column_2], inplace=True)
    return distances

Это дало немедленное улучшение от месяцев к дням ожидаемого времени, но я подозревал, что пропуск полного кадра данных будет создавать значительные накладные расходы.

После изменения функции для передачи только требуемых значений было достигнуто другое немедленное улучшение до менее чем дня (~ 20 часов):

def get_distance(id1, id2, embed1, embed2):
    return [id1, id2, distance.euclidean(embed1, embed2)]

# ...later, in get_distances()...

data = Parallel(n_jobs=-1)(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in combinations(indexes, 2))

Наконец, основываясь на документах joblib и том факте, что значительный объем данных все еще передается работникам, я переключился на многопроцессорную серверную часть и увидел, что ожидаемое время сократилось до ~ 1,5 часа. , (Я также добавил библиотеку tqdm, чтобы получить лучшее представление о прогрессе, чем то, что предоставляет joblib)

data = Parallel(n_jobs=-1, backend='multiprocessing')(delayed(get_distance)(min(ids), max(ids), df.loc[ids[0], embeddings_column], df.loc[ids[1], embeddings_column]) for ids in tqdm(combinations(indexes, 2), total=total_combinations))

Надеюсь, это поможет кому-то еще в их первом набеге на распараллеливание Python!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...