PySpark - Выберите строки, в которых столбец имеет непоследовательные значения после группировки - PullRequest
0 голосов
/ 10 января 2019

У меня есть датафрейм в форме:

|user_id| action | day |
------------------------
| d25as | AB     | 2   |
| d25as | AB     | 3   |
| d25as | AB     | 5   |
| m3562 | AB     | 1   |
| m3562 | AB     | 7   |
| m3562 | AB     | 9   |
| ha42a | AB     | 3   |
| ha42a | AB     | 4   |
| ha42a | AB     | 5   |

Я хочу отфильтровать пользователей, которые видны в последующие дни, если они не видны хотя бы в один непоследовательный день. Результирующий кадр данных должен быть:

|user_id| action | day |
------------------------
| d25as | AB     | 2   |
| d25as | AB     | 3   |
| d25as | AB     | 5   |
| m3562 | AB     | 1   |
| m3562 | AB     | 7   |
| m3562 | AB     | 9   |

где последний пользователь был удален, так как он появлялся только в последующие дни. Кто-нибудь знает, как это можно сделать в искре?

Ответы [ 2 ]

0 голосов
/ 10 января 2019

Использование оконных функций spark-sql и без каких-либо файлов. Конструкция df выполняется в scala, но часть sql будет такой же в python. Проверьте это:

val df = Seq(("d25as","AB",2),("d25as","AB",3),("d25as","AB",5),("m3562","AB",1),("m3562","AB",7),("m3562","AB",9),("ha42a","AB",3),("ha42a","AB",4),("ha42a","AB",5)).toDF("user_id","action","day")
df.createOrReplaceTempView("qubix")
spark.sql(
  """ with t1( select user_id, action, day, row_number() over(partition by user_id order by day)-day diff from qubix),
           t2( select user_id, action, day, collect_set(diff) over(partition by user_id) diff2 from t1)
                select user_id, action, day from t2 where size(diff2) > 1
  """).show(false)

Результаты:

+-------+------+---+
|user_id|action|day|
+-------+------+---+
|d25as  |AB    |2  |
|d25as  |AB    |3  |
|d25as  |AB    |5  |
|m3562  |AB    |1  |
|m3562  |AB    |7  |
|m3562  |AB    |9  |
+-------+------+---+

версия pyspark

>>> from pyspark.sql.functions import  *
>>> values = [('d25as','AB',2),('d25as','AB',3),('d25as','AB',5),
...           ('m3562','AB',1),('m3562','AB',7),('m3562','AB',9),
...           ('ha42a','AB',3),('ha42a','AB',4),('ha42a','AB',5)]
>>> df = spark.createDataFrame(values,['user_id','action','day'])
>>> df.show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
|  ha42a|    AB|  3|
|  ha42a|    AB|  4|
|  ha42a|    AB|  5|
+-------+------+---+

>>> df.createOrReplaceTempView("qubix")
>>> spark.sql(
...   """ with t1( select user_id, action, day, row_number() over(partition by user_id order by day)-day diff from qubix),
...            t2( select user_id, action, day, collect_set(diff) over(partition by user_id) diff2 from t1)
...                 select user_id, action, day from t2 where size(diff2) > 1
...   """).show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
+-------+------+---+

>>>
0 голосов
/ 10 января 2019

Читайте комментарии между ними. Код будет понятен сам по себе.

from pyspark.sql.functions import udf, collect_list, explode
#Creating the DataFrame
values = [('d25as','AB',2),('d25as','AB',3),('d25as','AB',5),
          ('m3562','AB',1),('m3562','AB',7),('m3562','AB',9),
          ('ha42a','AB',3),('ha42a','AB',4),('ha42a','AB',5)]
df = sqlContext.createDataFrame(values,['user_id','action','day'])
df.show() 
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
|  ha42a|    AB|  3|
|  ha42a|    AB|  4|
|  ha42a|    AB|  5|
+-------+------+---+

# Grouping together the days in one list.
df = df.groupby(['user_id','action']).agg(collect_list('day'))
df.show()
+-------+------+-----------------+
|user_id|action|collect_list(day)|
+-------+------+-----------------+
|  ha42a|    AB|        [3, 4, 5]|
|  m3562|    AB|        [1, 7, 9]|
|  d25as|    AB|        [2, 3, 5]|
+-------+------+-----------------+

# Creating a UDF to check if the days are consecutive or not. Only keep False ones.
check_consecutive = udf(lambda row: sorted(row) == list(range(min(row), max(row)+1)))
df = df.withColumn('consecutive',check_consecutive(col('collect_list(day)')))\
      .where(col('consecutive')==False)
df.show()
+-------+------+-----------------+-----------+
|user_id|action|collect_list(day)|consecutive|
+-------+------+-----------------+-----------+
|  m3562|    AB|        [1, 7, 9]|      false|
|  d25as|    AB|        [2, 3, 5]|      false|
+-------+------+-----------------+-----------+

# Finally, exploding the DataFrame from above to get the result.
df = df.withColumn("day", explode(col('collect_list(day)')))\
       .drop('consecutive','collect_list(day)')
df.show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|  m3562|    AB|  1|
|  m3562|    AB|  7|
|  m3562|    AB|  9|
|  d25as|    AB|  2|
|  d25as|    AB|  3|
|  d25as|    AB|  5|
+-------+------+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...