Ниже приведен исполнитель пула потоков, который я реализовал в python 3.x
with ThreadPoolExecutor(max_workers=15) as ex:
f = open(filename, 'r', encoding='UTF-8')
results = {ex.submit(callreadline, files ): files for files in f.readlines() }
Переменная results содержит значения в следующем формате:
слов и соответствующих им 200-мерных вложений
Вы можете видеть, что значения являются кортежами. Первое значение - это слово, а второе - 200-мерный массив. Количество значений составляет 400000. Таким образом, есть 400000 кортежей.
Теперь я хочу создать другого исполнителя пула потоков, который выполняет следующую задачу
- Создать упорядоченный словарь из первых значений в списке кортежей. Это означает, что слова из первых четырех значений кортежа , есть, как сказано . Тогда заказанный словарь будет содержать:
{: 0, является: 1, являются: 2, сказал: 3, ... ............... .hello: 399999}
- Создать numy nd массив, который содержит 200-мерный массив соответствующих слов в упорядоченном словаре (Под соответствующим словом я подразумеваю, что первая запись будет состоять из 200-мерного массива слова , затем 200-мерного массива из - это ... и этот список можно продолжить). Таким образом, числовой массив будет иметь размер 400000 * 200.
Я использовал цикл for со следующим кодом
count = 0
word_to_idx = OrderedDict()
vectors = []
for future in results.result:
b = future.result()
word_to_idx[count] = b[0]
if(count == 0):
vectors = np.array([b[1]])
else:
vectors = np.append(vectors,np.array([b[1]]),axis=0)
count = count +1
В конце вышеприведенной функции я вернул word_to_idx и векторы, которые сделали эту работу. Однако создание цикла из 400 000 кортежей и присвоение переменной по очереди заняло очень много времени (около 10 часов).
Так что я подумал, есть ли способ распараллелить эту функциональность так же с помощью исполнителя пула потоков.
Я думал о создании потоков, а затем делить переменную счетчика, чтобы каждый поток получал доступ к общей переменной по одному. Затем поток будет увеличивать эту переменную, а затем другой поток получит доступ к увеличенному счетчику . Может ли кто-нибудь указать мне правильное направление?
Edit:
Вот функция вызова readline, которая работает очень быстро, так как вызывается с 15 рабочими:
def callreadline(line):
# word_to_idx = OrderedDict()
word_to_idx = OrderedDict()
vectors = []
vocabulary = None
word_to_idx = read_w2v_word(line.split(' ')[0])
try:
vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
except:
vectors = np.array(line.split(' ')[1:],dtype=float)
if vocabulary is not None:
word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
return word_to_idx,vectors