Распараллеливание нечеткого сопоставления строк с помощью циклов for с использованием Dask (с задержкой) - PullRequest
0 голосов
/ 26 июня 2019

У меня есть несколько фреймов данных панд (по одному в год), содержащих информацию о фирме.Моя общая цель - создать идентификатор для каждой компании, чтобы превратить базу данных с несколькими сечениями в структуру панели.Мой первоначальный подход состоял в том, чтобы сопоставить каждую компанию первого года наблюдения со всеми компаниями следующего года путем нечеткого сопоставления нескольких переменных (например, названия компании, местоположения головного офиса и т. Д.).Подход нечеткого соответствия необходим, потому что данные генерируются алгоритмом OCR, потенциально вызывающим орфографические ошибки.

Я написал код, который работал хорошо.Я даже смог распараллелить его на всех ядрах одного узла кластера, используя стандартную многопроцессорную библиотеку python.Тем не менее, для завершения кода все равно требуется около 40 дней.Поэтому идея состояла в том, чтобы применить более сложную процедуру и распараллелить код на нескольких узлах кластера, используя dask и dask.distributed.Я провел последние пару дней, пытаясь научиться использовать Dask.По-видимому, dask.dataframe не рекомендуется использовать для моих целей (поскольку я должен циклически проходить по каждой строке фрейма данных).Таким образом, мой подход заключался в создании списков из столбцов, которые должны совпадать.Впоследствии код проходит по каждому элементу из списка и выполняет нечеткое сопоставление в цикле for с использованием dask.delayed.

Результат из базового кода, который вы видите ниже, теперь представляет собой список отложенных функций.Теперь у меня вопрос: есть ли простой способ «распаковать» их, используя все доступные узлы и ядра, чтобы я мог видеть реальные цифры и потенциально вставлять их в кадр данных dask или pandas?

from dask import delayed
from time import sleep
from fuzzywuzzy import fuzz, process
import dask.dataframe as dd
import dask.array as da
import dask

m_list = ['aaa', 'bbb', 'ccc', 'ddd', 'eee']
s_list = ['ccc', 'ddd', 'eee', 'lll', 'kkk']


def my_function(i, slave_list):
    fuzzy_matches0 = []
    for j in slave_list:
        fuzzy_match = fuzz.ratio(str(i), str(j))
        fuzzy_matches0.append(fuzzy_match)
    return fuzzy_matches0


results = []
for i in m_list:
    x = dask.delayed(my_function)(i, s_list)
    results.append(x)


print(results)
...