Pyspark - Выберите пользователей, которых видели как минимум 2 дня подряд - PullRequest
0 голосов
/ 09 февраля 2019

У меня есть фрейм данных dataframe_actions с полями: user_id, action, day.user_id уникален для каждого пользователя, а day принимает значения от 1 до 31. Я хочу отфильтровать только тех пользователей, которых видели не менее 2 дней подряд, например:

Если пользовательвидел в дни 1,2,4,8,9, я хочу сохранить их, так как они видны по крайней мере 2 дня подряд.

То, что я делаю сейчас, неуклюже и очень медленно (и, похоже, не работает):

df_final = spark.sql(""" with t1( select user_id, day, row_number()
           over(partition by user_id order by day)-day diff from dataframe_actions), 
           t2( select user_id, day, collect_set(diff) over(partition by user_id) diff2 from t1) 
           select user_id, day from t2 where size(diff2) > 2""")

Что-то в этом роде, но я не знаю, как это исправить.

РЕДАКТИРОВАТЬ:

| user_id | action | day |
--------------------------
| asdc24  | conn   |  1  |
| asdc24  | conn   |  2  |
| asdc24  | conn   |  5  |
| adsfa6  | conn   |  1  |
| adsfa6  | conn   |  3  |
| asdc24  | conn   |  9  |
| adsfa6  | conn   |  5  |
| asdc24  | conn   |  11 |
| adsfa6  | conn   |  10 |
| asdc24  | conn   |  15 |

должен вернуть

| user_id | action | day |
--------------------------
| asdc24  | conn   |  1  |
| asdc24  | conn   |  2  |
| asdc24  | conn   |  5  |
| asdc24  | conn   |  9  |
| asdc24  | conn   |  11 |
| asdc24  | conn   |  15 |

, поскольку только этот пользователь был подключен в течение минимум двух последовательных дней (дней1 и 2).

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019

Другой подход SQL с использованием заданных входных данных.

Pyspark

>>> from pyspark.sql.functions import *
>>> df = sc.parallelize([("asdc24","conn",1),
... ("asdc24","conn",2),
... ("asdc24","conn",5),
... ("adsfa6","conn",1),
... ("adsfa6","conn",3),
... ("asdc24","conn",9),
... ("adsfa6","conn",5),
... ("asdc24","conn",11),
... ("adsfa6","conn",10),
... ("asdc24","conn",15)]).toDF(["user_id","action","day"])
>>> df.createOrReplaceTempView("qubix")
>>> spark.sql(" select * from qubix order by user_id, day").show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| adsfa6|  conn|  1|
| adsfa6|  conn|  3|
| adsfa6|  conn|  5|
| adsfa6|  conn| 10|
| asdc24|  conn|  1|
| asdc24|  conn|  2|
| asdc24|  conn|  5|
| asdc24|  conn|  9|
| asdc24|  conn| 11|
| asdc24|  conn| 15|
+-------+------+---+

>>> spark.sql(""" with t1 (select user_id,action, day,lead(day) over(partition by user_id order by day) ld from qubix), t2 (select user_id from t1 where ld-t1.day=1 ) select * from qubix where user_id in (select user_id from t2) """).show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| asdc24|  conn|  1|
| asdc24|  conn|  2|
| asdc24|  conn|  5|
| asdc24|  conn|  9|
| asdc24|  conn| 11|
| asdc24|  conn| 15|
+-------+------+---+

>>>

Версия Scala

scala> val df = Seq(("asdc24","conn",1),
     | ("asdc24","conn",2),
     | ("asdc24","conn",5),
     | ("adsfa6","conn",1),
     | ("adsfa6","conn",3),
     | ("asdc24","conn",9),
     | ("adsfa6","conn",5),
     | ("asdc24","conn",11),
     | ("adsfa6","conn",10),
     | ("asdc24","conn",15)).toDF("user_id","action","day")
df: org.apache.spark.sql.DataFrame = [user_id: string, action: string ... 1 more field]

scala> df.orderBy('user_id,'day).show(false)
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|adsfa6 |conn  |1  |
|adsfa6 |conn  |3  |
|adsfa6 |conn  |5  |
|adsfa6 |conn  |10 |
|asdc24 |conn  |1  |
|asdc24 |conn  |2  |
|asdc24 |conn  |5  |
|asdc24 |conn  |9  |
|asdc24 |conn  |11 |
|asdc24 |conn  |15 |
+-------+------+---+


scala> df.createOrReplaceTempView("qubix")

scala> spark.sql(""" with t1 (select user_id,action, day,lead(day) over(partition by user_id order by day) ld from qubix), t2 (select user_id fro
m t1 where ld-t1.day=1 ) select * from qubix where user_id in (select user_id from t2) """).show(false)
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|asdc24 |conn  |1  |
|asdc24 |conn  |2  |
|asdc24 |conn  |5  |
|asdc24 |conn  |9  |
|asdc24 |conn  |11 |
|asdc24 |conn  |15 |
+-------+------+---+


scala>
0 голосов
/ 09 февраля 2019

Используйте lag, чтобы получить предыдущий день для каждого пользователя, вычтите его из дня текущей строки, а затем проверьте, равен ли один из них 1. Это делается с group by и filter после этого.

from pyspark.sql import functions as f
from pyspark.sql import Window
w = Window.partitionBy(dataframe_actions.user_id).orderBy(dataframe_actions.day)
user_prev = dataframe_actions.withColumn('prev_day_diff',dataframe_actions.day-f.lag(dataframe_actions.day).over(w))
res = user_prev.groupBy(user_prev.user_id).agg(f.sum(f.when(user_prev.prev_day_diff==1,1).otherwise(0)).alias('diff_1'))
res.filter(res.diff_1 >= 1).show()

Подходит еще один способ с разницей номеров строк.Это позволит выбрать все столбцы для данного идентификатора пользователя.

w = Window.partitionBy(dataframe_actions.user_id).orderBy(dataframe_actions.day)
rownum_diff = dataframe_actions.withColumn('rdiff',day-f.row_number().over(w))
w1 = Window.partitionBy(rownum_diff.user_id)
counts_per_user = rownum_diff.withColumn('cnt',f.sum(f.when(rownum_diff.rdiff == 1,1).otherwise(0)).over(w1))
cols_to_select = ['user_id','action','day']
counts_per_user.filter(counts_per_user.cnt >= 1).select(*cols_to_select).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...