Повысить производительность моей программы pyspark для фильтра данных - PullRequest
1 голос
/ 16 мая 2019

Я хочу повысить производительность своего приложения с помощью pyspark.

Фрейм данных выглядит следующим образом: каждый столбец представляет комментарий, опубликованный пользователем XXX для вопроса XXX.

+-----------+--------------+----------+

|user_id    | comment_id   |question_id| 

+-----------+--------------+----------+

|   10181831|R2OX4JVN03FOMT|6302014700| 

|   51697036|R1R4GTQCO51GC0|B0000001NY| 

Теперь я хочу удалить комментарии, опубликованные пользователями, у которых опубликовано менее 1 комментария, и отзывы на вопросы, содержащие менее 3 комментариев

Мой код такой:

window_u = Window.partitionBy("user_id")
window_p = Window.partitionBy("question_id")


reviews = reviews.withColumn("n", 
count("user_id").over(window_u)).filter("n >= 1").drop("n")
reviews = reviews.withColumn("n", 
count("question_id").over(window_p)).filter("n >= 3").drop("n")

Проблема в том, что теперь это приложение работает очень медленно, так как у меня действительно большие наборы данных. Есть ли проблемы с увеличением производительности?

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

Вы можете попробовать этот подход -

1.Создайте список elitable_user и elitable_questions на основе желаемого условия

2.Используйте isin() для фильтрации пользователей и вопросов, которые не соответствуют условию.

Вот подробный пример:

eligible_users = df\
    .groupBy('user_id')\
    .count().\
    filter("count>=1")\
    .select('user_id')\
    .rdd.map(lambda row : row[0])\
    .collect()

eligible_questions = df\
    .groupBy('question_id')\
    .count()\
    .filter("count>=3")\
    .select('question_id')\
    .rdd.map(lambda row : row[0])\
    .collect()

from pyspark.sql.functions import col

df\
    .where(col('user_id').isin(eligible_users) & col('question_id').isin(eligible_questions))\
    .show()
0 голосов
/ 16 мая 2019

вы можете напрямую отфильтровать нулевые записи для менее чем 1 записи, а затем выполнить запрос раздела, который наверняка будет выполняться быстрее, чем два раздела по запросам, и в вашем запросе есть ошибка, я думаю, что вы хотите посчитать comment_id вместо question_id

import pyspark.sql.functions as F
from pyspark.sql import Window

window_p = Window.partitionBy("question_id")

reviews = reviews.filter(
    F.col("comment_id).isNotNull()
).withColumn(
    "n", 
    F.count("comment_id").over(window_p)
).filter("n >= 3").drop("n")

или вы можете выполнить групповую операцию после первой фильтрации пустых записей

reviews.filter(
        F.col("comment_id).isNotNull()
).groupby(
  "question_id", 
   F.count("comment_id").alias("n")
).filter("n >= 3").drop("n")

это будет работать быстрее, чем по разделу, но вам нужно снова присоединить его к основной таблице, если вам нужны все столбцы

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...