python многопроцессорный пул в функции с несколькими входными параметрами, но только с одним итеративным - PullRequest
1 голос
/ 03 августа 2020

У меня есть функция с несколькими параметрами, iterable_token, dataframe, label_array. Однако только iterable_token может быть повторен в функции.

def cross_tab(label,token_presence):
    A_token=0
    B_token=0
    C_token=0
    D_token=0
    for i,j in zip(list(label),list(token_presence)):
        if i==True and j==True:
            A_token+=1
        elif i==False and j==False:
            D_token+=1
        elif i==True and j==False:
            C_token+=1
        elif i==False and j==True:
            B_token+=1
    return A_token,B_token,C_token,D_token

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    for token in iterable_token:
        try:
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                A[token]=A_token
                B[token]=B_token
                C[token]=C_token
                D[token]=D_token
                token_count[token]=token_presence_sum
                token_list.append(token)
        except Exception as e:
            pass
    return (A,B,C,D,token_count,token_list)

Как распараллелить функцию My_ParallelFunction?

Edit1 : Я попробовал метод, предложенный в пример 1, потому что это то, что я ищу, чтобы распараллелить функцию.

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
    results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))

, но сообщение об ошибке

RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<timed exec> in <module>

/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
    272         `func` and (a, b) becomes func(a, b).
    273         '''
--> 274         return self._map_async(func, iterable, starmapstar, chunksize).get()
    275 
    276     def starmap_async(self, func, iterable, chunksize=None, callback=None,

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given

Edit2 : Вот файл, который я с помощью. Вы можете скачать его с здесь и разархивировать. Кроме того, запустите нижеприведенный сценарий, чтобы получить необходимые входные параметры. Обязательно установите nltk, pandas и numpy и измените путь к файлу TokenFile.csv.

from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np

dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)

def get_uniquetoken(stop_words,input_doc_list):
    ##get unique words across all documents
    if stop_words:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
    else:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
    unique_words=set(unique_words)
    print('unique_words done! length is:',len(unique_words) )
    return unique_words


input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)

Edit 3 Это решение, которое я использую

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    i=0
    
    with mp.Pool(4) as p:
        token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
    #print(token_result[0])
    return token_result#(A,B,C,D,token_count,token_list)


def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
    #print(token)
    try:
        token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
        token_presence_sum=sum(token_presence)
        #print(token_presence_sum)
        if token_presence_sum:
            A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
            #print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
#             print('token_list:',token_list)
    except Exception as e:
        pass
    return A,B,C,D,token_count,token_list

Однако это не дает мне желаемого результата. Это 949 X 6 X разные_размеры матрица

Ответы [ 2 ]

2 голосов
/ 03 августа 2020

Вот два игрушечных примера, показывающих, как можно распараллелить подобную функцию.

Первый вариант. Если вы хотите распараллелить всю функцию. Вы можете сделать это, используя Pool.starmap () . .starmap () работает как map (), но вы можете передать ему несколько аргументов.

from multiprocessing import Pool
import time


#Example 1 Simple function parallelization
def f(a,b,c,_list):
    x = a+b+c
    time.sleep(1)
    _list.append(x)
    return _list

inputs = [
    (1,2,3,['a','b','c']),
    (1,2,3,['d','e','f']),
    (1,2,3,['x','y','z']),
    (1,2,3,['A','B','C']),
]

start = time.time()
with Pool(4) as p:
    results = p.starmap(f, inputs)
end = time.time()

for r in results:
    print(r)
    
print(f'done in {round(end-start, 3)} seconds')

Вывод:

['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds

Второй вариант. Если вы хотите распараллелить только for-l oop внутри функции. В этом случае вам следует переписать свой l oop как функцию и вызвать ее с помощью Pool.map () или Pool.starmap ().

#Example 2. Function calling a parallel function

#loop
def g(_string):
    time.sleep(1)
    return _string + '*'

#outer function
def f(a,b,c,_list):
    x = a+b+c
    _list.append(str(x))
    #loop parallelization
    with Pool(4) as p:
        new_list = p.map(g, _list)
    return new_list

start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()

print(result)
print(f'done in {round(end-start, 3)} seconds')

Вывод:

['a*', 'b*', 'c*', '6*']
done in 1.048 seconds

Обратите внимание, что «l oop function» содержит logi c для работы с одним элементом итерации. Pool.map () выполнит его для всех элементов.

Вызовы time.sleep(1) предназначены для имитации некоторых трудоемких вычислений. Если распараллеливание работает, вы сможете обработать 4 ввода за 1 секунду, а не за 4 секунды.

Вот пример использования вашего кода:

def My_ParallelFunction(iterable_token, dataframe, label_array):

    with mp.Pool(4) as p:
        token_result = p.starmap(
            _loop,
            [(token, dataframe, label_array) for token in iterable_token]
        )
    return token_result


def _loop(token, dataframe, label_array):
    A={}
    B={}
    C={}
    D={}
    token_count = {}
    token_list = []
    try:
        
        token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
        token_presence_sum=sum(token_presence)
        if token_presence_sum:
            A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
            return A,B,C,D,token_count,token_list

    except Exception as e:
        print(e)
0 голосов
/ 03 августа 2020

Что-то в этом роде должно сработать, если вы sh используете мультипроцессор только для l oop, а не для всей функции.

from multiprocessing import Pool

def My_ParallelFunction(iterable_token,dataframe,label_array):
    def get_token_counts(token,dataframe,label_array):
        try:
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                return token,A_token,B_token,C_token,D_token
        except Exception as e:
            print(e)
            pass
        
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    with Pool() as p:
        p_results = p.starmap(get_token_counts, [(token, dataframe, label_array) for token in iterable_token])
        
    for res in p_results:
        if res is None:
            continue
        token,A_token,B_token,C_token,D_token = res
        A[token]=A_token
        B[token]=B_token
        C[token]=C_token
        D[token]=D_token
        token_count[token]=token_presence_sum
        token_list.append(token)
    return (A,B,C,D,token_count,token_list)

Я удалил часть, в которой вы добавляете элементы в списки и словари из рабочей функции, так как вам придется заглядывать в очереди или общие объекты, чтобы многократно добавлять их к списку или dict. Это больше работы, но должно заставить ваш код работать немного быстрее (все зависит от того, сколько элементов у вас есть в вашей итерации и что требует много времени для вычисления).

Идея этого кода заключается в том, что вы создаете рабочая функция get_token_counts, которая будет выполняться внутри каждого потока, если у него есть token, dataframe и label_array. Возвращаемая часть функции содержит все элементы, необходимые для добавления ваших элементов в словарь (поскольку вы не можете точно знать, какой поток завершается первым, вы возвращаете с ним token, и он решает все ваши проблемы с индексацией. Хотя, возможно, starmap сохраняет порядок аргументов, поэтому, возможно, в этом нет необходимости).

После того, как все элементы были вычислены, вы переходите к добавлению их в свои списки и dicts.

Это в основном многопроцессорная обработка некоторых из функции фрейма данных вместе с cross_tab, а не совсем My_ParallelFunction.

Так как вы не привели примера, я не могу проверить код и придумать что-то получше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...