scala spark count число изменений во времени в DataFrame - PullRequest
0 голосов
/ 11 января 2019

например у меня есть этот тип DataFrame:

val DF = Seq((10, "id1",1), 
(20, "id1",6), 
(30, "id1",6), 
(40, "id1",11), 
(50, "id1",1), 
(60, "id1",1), 
(70, "id1",11),
(10, "id2",1), 
(20, "id2",11), 
(30, "id2",1), 
(40, "id2",6), 
(50, "id2",1), 
(60, "id2",11), 
(70, "id2",6)).toDF("Time", "ID","Channel")

+----+---+-------+
|Time| ID|Channel|
+----+---+-------+
|  10|id1|      1|
|  20|id1|      6|
|  30|id1|      6|
|  40|id1|     11|
|  50|id1|      1|
|  60|id1|      1|
|  70|id1|     11|
|  10|id2|      1|
|  20|id2|     11|
|  30|id2|      1|
|  40|id2|      6|
|  50|id2|      1|
|  60|id2|     11|
|  70|id2|      6|
+----+---+-------+

Я бы хотел для каждого идентификатора посчитать количество изменений значения Channel, с течением времени. Чтобы получить результат как

+---+-----------------------+
| ID|NumberChannelChangement|
+---+-----------------------+
|id1|                      4|
|id2|                      6|
+---+-----------------------+

Я попытался преобразовать DataFrame в RDD и перебрать его. Я не получаю один и тот же результат от одного прогона к другому, когда использую один и тот же ввод.

Заранее спасибо за помощь

Ответы [ 2 ]

0 голосов
/ 11 января 2019

Использование spark-sql.

df.createOrReplaceTempView("PierreK ")
spark.sql(
  """  with t1 (select time,id, channel, lag(channel) over(partition by id order by time) chn_lag from pierrek)
       select id, sum( case when chn_lag is null then 0 when channel=chn_lag then 0 else 1 end) as NumberChannelChangement from t1 group by id
  """).show(false)

Результаты:

+---+-----------------------+
|id |NumberChannelChangement|
+---+-----------------------+
|id1|4                      |
|id2|6                      |
+---+-----------------------+
0 голосов
/ 11 января 2019

Вы можете сделать это, используя комбинацию аналитических функций (lag) для обнаружения изменений и groupBy для подсчета изменений:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

df
.withColumn("lag_Channel",lag($"Channel",1).over(Window.partitionBy($"ID").orderBy($"Time")))
.withColumn("change",coalesce($"Channel"=!=$"lag_channel",lit(false)))
.groupBy($"ID")
.agg(
  sum(when($"change",lit(1))).as("NumberChannelChangement")
)
.show()

+---+-----------------------+
| ID|NumberChannelChangement|
+---+-----------------------+
|id1|                      4|
|id2|                      6|
+---+-----------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...