У меня есть код подсчета слов, который очень медленно работает на большом текстовом корпусе и взламывает мой компьютер.Я провел некоторый анализ и обнаружил, что виновниками являются ReduByKey () и takeOrdered ().См. Пример ниже:
text = [['A', 'B', 'C', 'D', 'E'],
['F', 'E', 'G', 'A', 'B'],
['D', 'E', 'H', 'A', 'B'],
['A', 'B', 'C', 'F', 'E'],
['A', 'B', 'C', 'J', 'E'],
['E', 'H', 'A', 'B', 'C'],
['E', 'G', 'A', 'B', 'C'],
['C', 'F', 'E', 'G', 'A'],
['C', 'D', 'E', 'H', 'A'],
['C', 'J', 'E', 'H', 'A'],
['H', 'A', 'B', 'C', 'F'],
['H', 'A', 'B', 'C', 'J'],
['B', 'C', 'F', 'E', 'G'],
['B', 'C', 'D', 'E', 'H'],
['B', 'C', 'F', 'E', 'K'],
['B', 'C', 'J', 'E', 'H'],
['G', 'A', 'B', 'C', 'F'],
['J', 'E', 'H', 'A', 'B']]
textRDD=sc.text.parallelize()
Вот мой код для подсчета слов:
cacheList = textRDD.flatMap(lambda x: x) \
.map(lambda word: (word,1))\
.reduceByKey(lambda a, b: a + b)\
.takeOrdered(10, key=lambda x: -x[1])
cacheList[:4]
вывод:
[('B', 15),
('C', 15),
('E', 15),
('A', 14),
('H', 9),
('F', 7),
('J', 5),
('G', 5)]
Мне нужно использовать эту программу для гораздо большеготекст, но он занимает вечность, как написано из-за операций reduByKey () и особенно takeOrdered ().Вот мое время, приведенное ниже: (время каждого этапа суммируется с предыдущими этапами), например, время «map» для этапа будет временем для «map» + время для «lowerByKey». Время указывается в мс.
times = np.array([2.24,4.64,20.4,185.0])
stages =['flatMap','map','reduce','takeOrdered']
plt.plot(stages,times,marker='s',color = 'red')
plt.grid(True)
Это выглядит довольно плохо.Насколько я понимаю, при гораздо большем корпусе слишком много пар ключ-значение выбрасываются за раз до того, как произойдет перемешивание.Мой компьютер не может справиться с этим.Это может быть тривиальный вопрос от человека, который только что научился кодировать в pyspark пару недель назад, но ... есть ли способ изменить этот код для борьбы с этой проблемой?