Как разделить переменную счетчика между потоками с помощью threadpool.executor и увеличить его? - PullRequest
1 голос
/ 25 июня 2019

Ниже приведен исполнитель пула потоков, который я реализовал в python 3.x

  with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = {ex.submit(callreadline, files ): files for files in f.readlines() }

Переменная results содержит значения в следующем формате:

слов и соответствующих им 200-мерных вложений

Вы можете видеть, что значения являются кортежами. Первое значение - это слово, а второе - 200-мерный массив. Количество значений составляет 400000. Таким образом, есть 400000 кортежей.

Теперь я хочу создать другого исполнителя пула потоков, который выполняет следующую задачу

  1. Создать упорядоченный словарь из первых значений в списке кортежей. Это означает, что слова из первых четырех значений кортежа , есть, как сказано . Тогда заказанный словарь будет содержать:

{: 0, является: 1, являются: 2, сказал: 3, ... ............... .hello: 399999}

  1. Создать numy nd массив, который содержит 200-мерный массив соответствующих слов в упорядоченном словаре (Под соответствующим словом я подразумеваю, что первая запись будет состоять из 200-мерного массива слова , затем 200-мерного массива из - это ... и этот список можно продолжить). Таким образом, числовой массив будет иметь размер 400000 * 200.

Я использовал цикл for со следующим кодом

    count = 0
    word_to_idx = OrderedDict()
    vectors = []
    for future in results.result:
            b = future.result()
            word_to_idx[count] = b[0]
            if(count == 0):
                vectors =  np.array([b[1]])
            else:    
                vectors = np.append(vectors,np.array([b[1]]),axis=0)
            count = count +1

В конце вышеприведенной функции я вернул word_to_idx и векторы, которые сделали эту работу. Однако создание цикла из 400 000 кортежей и присвоение переменной по очереди заняло очень много времени (около 10 часов).

Так что я подумал, есть ли способ распараллелить эту функциональность так же с помощью исполнителя пула потоков.

Я думал о создании потоков, а затем делить переменную счетчика, чтобы каждый поток получал доступ к общей переменной по одному. Затем поток будет увеличивать эту переменную, а затем другой поток получит доступ к увеличенному счетчику . Может ли кто-нибудь указать мне правильное направление?

Edit:

Вот функция вызова readline, которая работает очень быстро, так как вызывается с 15 рабочими:

def callreadline(line):
        # word_to_idx = OrderedDict() 
        word_to_idx = OrderedDict()
        vectors = []
        vocabulary = None
        word_to_idx = read_w2v_word(line.split(' ')[0])
        try:
            vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
        except:
            vectors = np.array(line.split(' ')[1:],dtype=float)
        if vocabulary is not None:
            word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
        return word_to_idx,vectors

1 Ответ

0 голосов
/ 25 июня 2019

У меня такое ощущение, что функция callreadline также не настолько быстра, как могла бы быть, но это не было частью вопроса, поэтому позвольте мне попытаться исправить все остальное:

with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = [ex.submit(callreadline, files) for files in f.readlines()]

word_to_idx = dict()
vectors = []
for count, future in enumerate(results):
    b = future.result()
    word_to_idx[b[0]] = count
    vectors.append(b[1])

vectors = np.array(vectors)

...