Использование concurrent.futures в операторе for - PullRequest
0 голосов
/ 05 мая 2020

Я храню QuertyText в кадре данных pandas. После того, как я загрузил все запросы, я хочу снова провести анализ каждого запроса. На данный момент у меня есть ~ 50к для оценки. Так что выполнение этого одного за другим займет много времени.

Итак, я хотел реализовать concurrent.futures. Как мне взять отдельный QueryText, хранящийся в fullAnalysis, как передать его concurrent.futures и вернуть результат в виде переменной?

Вот мой весь код:

import pandas as pd
import time
import gensim
import sys
import warnings

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

fullAnalysis = pd.DataFrame()

def fetch_data(jFile = 'ProcessingDetails.json'):
    print("Fetching data...please wait")

    #read JSON file for latest dictionary file name
    baselineDictionaryFileName = 'Dictionary/Dictionary_05-03-2020.json'

    #copy data to pandas dataframe
    labelled_data = pd.read_json(baselineDictionaryFileName)

    #Add two more columns to get the most similar text and score
    labelled_data['SimilarText'] = ''
    labelled_data['SimilarityScore'] = float()

    print("Data fetched from " + baselineDictionaryFileName + " and there are " + str(labelled_data.shape[0]) + " rows to be evalauted")

    return labelled_data


def calculateScore(inputFunc):
    warnings.filterwarnings("ignore", category=DeprecationWarning) 

    model = gensim.models.Word2Vec.load('w2v_model_bigdata')

    inp = inputFunc
    print(inp)
    out = dict()

    strEvaluation = inp.split("most_similar ",1)[1]

    #while inp != 'quit':
    split_inp = inp.split()

    try:
        if split_inp[0] == 'help':
            pass
        elif split_inp[0] == 'similarity' and len(split_inp) >= 3:
            pass
        elif split_inp[0] == 'most_similar' and len(split_inp) >= 2:
            for pair in model.most_similar(positive=[split_inp[1]]):
                out.update({pair[0]: pair[1]})

    except KeyError as ke:
        #print(str(ke) + "\n")
        inp = input()
    return out

def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            #for item in executor.map(calculateScore, arg):
            output = executor.map(calculateScore, arg)

    return output

if __name__ == "__main__":
    fullAnalysis = fetch_data()
    results = main()
    print(f'results: {results}')

Ответы [ 2 ]

0 голосов
/ 06 мая 2020

С помощью Booboo я смог обновить код, включив в него ProcessPoolExecutor. Вот мой обновленный код. В целом обработка данных увеличилась более чем на 60%.

Я столкнулся с проблемой обработки и нашел этот topi c BrokenPoolProcess , который решает эту проблему.

output = {}
thePool = {}

def main(labelled_data, dictionaryRevised):

    args = sys.argv[1:]

    with ProcessPoolExecutor(max_workers=None) as executor:
        for i in range(len(labelled_data)):
            text = labelled_data['QueryText'][i]
            arg = 'most_similar'+ ' '+ text

            output = winprocess.submit(
            executor, calculateScore, arg
            )
            thePool[output] = i  #original index for future to request


        for output in as_completed(thePool): # results as they become available not necessarily the order of submission
            i = thePool[output] # the original index
            text = labelled_data['QueryText'][i]
            result = output.result() # the result

            maximumKey = max(result.items(), key=operator.itemgetter(1))[0]
            maximumValue = result.get(maximumKey)

            labelled_data['SimilarText'][i] = maximumKey
            labelled_data['SimilarityScore'][i] = maximumValue


    return labelled_data, dictionaryRevised

if __name__ == "__main__":
    start = time.perf_counter()

    print("Starting to evaluate Query Text for labelling...")

    output_Labelled_Data, output_dictionary_revised = preProcessor()

    output,dictionary = main(output_Labelled_Data, output_dictionary_revised)


    finish = time.perf_counter()
    print(f'Finished in {round(finish-start, 2)} second(s)')
0 голосов
/ 05 мая 2020

Глобальная блокировка интерпретатора Python или GIL позволяет только одному потоку удерживать управление интерпретатором Python. Поскольку ваша функция calculateScore может быть привязана к процессору и требует, чтобы интерпретатор выполнял свой байтовый код, вы можете немного выиграть от использования потоковой передачи. С другой стороны, если бы он выполнял в основном операции ввода-вывода, он бы отказался от GIL большую часть своего времени выполнения, позволяя запускать другие потоки. Но, похоже, здесь дело обстоит не так. Вероятно, вам следует использовать ProcessPoolExecutor из concurrent.futures (попробуйте оба способа и посмотрите):

def main():
    with ProcessPoolExecutor(max_workers=None) as executor:
        the_futures = {}
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures[future] = i # map future to request
        for future in as_completed(the_futures): # results as they become available not necessarily the order of submission
            i = the_futures[future] # the original index
            result = future.result() # the result

Если вы опустите параметр max_workers (или укажите значение None) из конструктор ProcessPoolExecutor, по умолчанию будет количество процессоров, установленных на вашем компьютере (неплохое значение по умолчанию). Нет смысла указывать значение, превышающее количество имеющихся у вас процессоров.

Если вам не нужно ie возвращать будущее к исходному запросу, тогда the_futures может быть просто списком к которому Но проще всего даже не беспокоиться об использовании метода as_completed:

def main():
    with ProcessPoolExecutor(max_workers=5) as executor:
        the_futures = []
        for i in range(len(fullAnalysis)):
            text = fullAnalysis['QueryText'][i]
            arg = 'most_similar'+ ' ' + text
            future = executor.submit(calculateScore, arg)
            the_futures.append(future)
        # wait for the completion of all the results and return them all:
        results = [f.result() for f in the_futures()] # results in creation order
        return results 

Следует отметить, что код, запускающий функции ProcessPoolExecutor, должен находиться в блоке, управляемом if __name__ = '__main__':. Если это не так, вы попадете в рекурсивный l oop, где каждый подпроцесс будет запускать ProcessPoolExecutor. Но, похоже, здесь так и есть. Возможно, вы все время хотели использовать ProcessPoolExecutor?

Также:

Я не знаю, что это за строка ...

model = gensim.models.Word2Vec.load('w2v_model_bigdata')

... в функция calculateStore делает. Это может быть один оператор с привязкой к вводу-выводу. Но, похоже, это то, что не меняется от звонка к звонку. Если это так и model не изменяется в функции, не следует ли вынести этот оператор из функции и вычислить только один раз? Тогда эта функция явно будет работать быстрее (и будет явно связана с процессором).

Также:

Блок исключений ...

except KeyError as ke:
    #print(str(ke) + "\n")
    inp = input()

... вызывает недоумение . Вы вводите значение, которое никогда не будет использоваться, прямо перед возвратом. Если это нужно для приостановки выполнения, сообщение об ошибке не выводится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...