У меня есть фрейм данных pyspark с тремя столбцами: user_id, follower_count и tweet, где твит имеет строковый тип.
Сначала мне нужно выполнить следующие шаги предварительной обработки: - нижний регистр всего текста - удалитьзнаки препинания (и любые другие символы, не входящие в ascii) - токенизировать слова (разделенные на '')
Затем мне нужно объединить эти результаты по всем значениям твита: - найти количество раз, которое встречалось каждое слово - отсортировать почастота - Извлечение топ-n слов и их соответствующих значений
Я нашел следующий ресурс wordcount.py на GitHub;однако я не понимаю, что делает код;из-за этого у меня возникают трудности с настройкой в моем блокноте.
https://github.com/apache/spark/blob/master/examples/src/main/python/wordcount.py
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
Редактировать 1: Я не думаю, что сделал это явно, что я пытаюсь применить этот анализ к столбцу, твит.
Редактировать 2: я изменил код выше, вставив df.tweet в качестве аргумента, передаваемого в первую строку кода, и вызвал ошибку. Поэтому я полагаю, что столбцы не могут быть переданы в этот рабочий процесс;и я не уверен, как с этим справиться.
TypeError: Column is not iterable
Я добавил некоторые корректировки в соответствии с рекомендациями. Работает как шарм! Я не знал, что могу отправить пользовательские функции в лямбда-функцию. Оказалось, что это простой способ добавить этот шаг в рабочий процесс.
import re
def process_text(text):
text = text.lower()
text = re.sub(pattern='[^A-z ^\s]',repl='',string=text).split(' ')
return [word for word in text if word != '']
process_text('hi 343')
>>>> ['hi']
count_rdd = df.select("tweet").rdd.flatMap(lambda x: process_text(x[0])) \
.map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
count_rdd.collect()