Лучший способ реализовать многопроцессорную обработку через массивный Dataframe в Python - PullRequest
0 голосов
/ 02 июля 2019

У меня есть большой фрейм данных с данными, связанными с Active Directory, где я должен применить функцию к столбцу и сохранить выходные данные в список.

Одно из обнаруженных мной ограничений заключалось в том, что функция в ldap3нельзя напрямую использовать векторизованный ввод в oder, чтобы обойти этот подход - разделить фрейм данных на куски и применить к нему функцию.Функция будет применена к нескольким патронам одновременно (отдельные патчи обрабатываются последовательно)

У меня есть следующий код -

def searching_empid(user):
c = initiate_connection()
c.search(search_base='DC=xyz,DC=abc,DC=com',
         search_filter=f'(distinguishedName={ldap3.utils.conv.escape_filter_chars(user)})',
         search_scope=SUBTREE,
         attributes=['employeeID']
         )
id = c.entries[0].employeeID
return id

def vectorize_implementation(df):
    user_identifiers = df['Id'].values.astype(str)
    id_finder = lambda each_id : searching_empid(each_id)
    out=np.vectorize(id_finder, otypes=[list])
    try:
        out(user_identifiers)
    except ValueError:
        pass
    return out

if __name__ == '__main__':
    df = pd.read_pickle('Large_ADObject.pkl')
    n_proc = mp.cpu_count()
    chunksize = 30
    proc_chunks = []
    for i_proc in range(n_proc):
        chunkstart = i_proc * chunksize
        chunkend = (i_proc + 1) * chunksize if i_proc < n_proc - 1 else None
        proc_chunks.append(df.iloc[slice(chunkstart, chunkend)])
        assert sum(map(len, proc_chunks)) == len(df)

        with mp.Pool(processes=n_proc) as pool:
            proc_results = [pool.apply_async(vectorize_implementation,
                                             args=(chunk,))
                            for chunk in proc_chunks]
        #
        #     # blocks until all results are fetched
            result_chunks = [r.get() for r in proc_results]
            pool.close()
            pool.join()
            results = pd.concat(result_chunks)

        print(results)

Моя цель - получить все результаты водин объект, чтобы я мог использовать его для дальнейшей обработки.

Я сталкиваюсь со следующей ошибкой прямо сейчас при этом.Не могли бы вы, ребята, помочь мне решить эту ошибку.

multiprocessing.pool.MaybeEncodingError: Ошибка при отправке результата: ''.Причина: 'AttributeError ("Невозможно выбрать локальный объект" vectorize_implementation ..' ") '

...