Счетчик слов с pyspark - PullRequest
       5

Счетчик слов с pyspark

0 голосов
/ 22 октября 2019

У меня есть фрейм данных pyspark с тремя столбцами: user_id, follower_count и tweet, где твит имеет строковый тип.

Сначала мне нужно выполнить следующие шаги предварительной обработки: - нижний регистр всего текста - удалитьзнаки препинания (и любые другие символы, не входящие в ascii) - токенизировать слова (разделенные на '')

Затем мне нужно объединить эти результаты по всем значениям твита: - найти количество раз, которое встречалось каждое слово - отсортировать почастота - Извлечение топ-n слов и их соответствующих значений

Я нашел следующий ресурс wordcount.py на GitHub;однако я не понимаю, что делает код;из-за этого у меня возникают трудности с настройкой в ​​моем блокноте.

https://github.com/apache/spark/blob/master/examples/src/main/python/wordcount.py

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))

Редактировать 1: Я не думаю, что сделал это явно, что я пытаюсь применить этот анализ к столбцу, твит.

Редактировать 2: я изменил код выше, вставив df.tweet в качестве аргумента, передаваемого в первую строку кода, и вызвал ошибку. Поэтому я полагаю, что столбцы не могут быть переданы в этот рабочий процесс;и я не уверен, как с этим справиться.

TypeError: Column is not iterable

Я добавил некоторые корректировки в соответствии с рекомендациями. Работает как шарм! Я не знал, что могу отправить пользовательские функции в лямбда-функцию. Оказалось, что это простой способ добавить этот шаг в рабочий процесс.

import re
def process_text(text):
  text = text.lower()
  text = re.sub(pattern='[^A-z ^\s]',repl='',string=text).split(' ')
  return [word for word in text if word != '']

process_text('hi 343')
>>>> ['hi']

count_rdd = df.select("tweet").rdd.flatMap(lambda x: process_text(x[0])) \
              .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

count_rdd.collect()

1 Ответ

1 голос
/ 22 октября 2019

Не уверен, что ошибка вызвана for (word, count) in output: или операциями RDD над столбцом.

НО, вы можете просто использовать:

Для стиля RDD:

count_rdd = df.select("tweets").rdd.flatMap(lambda x: x[0].split(' ')) \
              .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

То, что вы пытаетесь сделать, - это операции RDD над pyspark.sql.column.Column объектом. Выше приведен простой подсчет слов для всех слов в столбце.

Если вы хотите указать его в самом столбце, вы можете сделать это с помощью explode () :

Для стиля столбца:

import pyspark.sql.functions as F

count_df = df.withColumn('word', F.explode(F.split(F.col('tweets'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

Вы можете использовать regexp_replace() и lower() из pyspark.sql.functions для выполнения шагов предварительной обработки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...