функция запаздывания с условием в искре - PullRequest
0 голосов
/ 14 июня 2019

Я использую искру, я хочу считать истинные столбцы, которые предшествуют ложным это мой стол:

условие

если d_difference> 900, то его считают ложным, в противном случае истина

id          date1               date2            d_difference    status    
534     18/03/15 11:50:30  18/03/15 11:50:45         15          true
534     18/03/15 11:50:50  18/03/15 11:50:59         09          true
534     18/03/15 12:00:30  18/03/15 12:35:45         1815        false
534     18/03/15 12:00:50  18/03/15 12:36:45         2165        false
534     18/03/15 12:37:30  18/03/15 12:37:45         15          true
534     18/03/15 12:38:00  18/03/15 12:38:10         10          true
534     18/03/15 12:42:30  18/03/15 12:50:45         495         true
534     18/03/15 12:50:50  18/03/15 01:15:45         1505        false
534     18/03/15 12:50:30  18/03/15 12:50:55         20          true


val v2 = v1.withColumn("status",when($"d_difference" > 900,false).otherwise(true))

v2.withcolumn("lag_data",when ($"staus" === false ,lag("status",1).over(w)).otherwise(null)).show()

Я использую функцию запаздывания для вычисления предыдущего истинного состояния, которое предшествует ложному, но оно не выполнено ...

Ответы [ 3 ]

0 голосов
/ 14 июня 2019

я могу использовать функцию 2 временного окна, чтобы вычислить все ложные

val w = Window.partitionBy("id").orderBy("date1","date2")
 val w1 = Window.partitionBy("id","status").orderBy("date1","date2")
 val r1 = ($"status" !== lag($"status", 1).over(w) && $"status").cast("bigint")
 v2.withColumn("new_session",r1)
 val t1 = v2.withColumn("session",sum(r1).over(w1)).show()

я получаю общее количество ложных значений, которое предшествует истине.

0 голосов
/ 14 июня 2019

Если вам нужна совокупная сумма True до False, вы можете обратиться ниже код:

    scala> import org.apache.spark.sql.expressions.Window
    scala> val w = Window.partitionBy("id").orderBy("id")
    scala> val w1 = Window.partitionBy("id").orderBy("rn")

 //Input data frame
    scala> df.show()
    +---+-----------------+-----------------+------------+------+
    | id|            date1|            date2|d_difference|status|
    +---+-----------------+-----------------+------------+------+
    |534|18/03/15 11:50:30|18/03/15 11:50:45|          15|  true|
    |534|18/03/15 11:50:50|18/03/15 11:50:59|          09| false|
    |534|18/03/15 12:00:30|18/03/15 12:35:45|        1815| false|
    |534|18/03/15 12:00:50|18/03/15 12:36:45|        2165| false|
    |534|18/03/15 12:37:30|18/03/15 12:37:45|          15|  true|
    |534|18/03/15 12:38:00|18/03/15 12:38:10|          10| false|
    |534|18/03/15 12:42:30|18/03/15 12:50:45|         495|  true|
    |534|18/03/15 12:50:50|18/03/15 01:15:45|        1505| false|
    |534|18/03/15 12:50:30|18/03/15 12:50:55|          20|  true|
    +---+-----------------+-----------------+------------+------+

    scala> val df1 = df.withColumn("rn", row_number over(w))
    scala> val df2 = df1.filter(col("status") === "false").withColumn("prv_rn",  lag("rn" ,1,0) over (w))

    scala> val df3 = df2.withColumn("sum", (col("rn") - col("prv_rn") - 1)).withColumn("true_count", sum(col("sum")) over(w1)).select("id","date1","date2","status","true_count")

  //Join final output
    scala> df.join(df3, Seq("id","date1","date2","status"),"left").show()
    +---+-----------------+-----------------+------+------------+----------+
    | id|            date1|            date2|status|d_difference|true_count|
    +---+-----------------+-----------------+------+------------+----------+
    |534|18/03/15 11:50:30|18/03/15 11:50:45|  true|          15|      null|
    |534|18/03/15 11:50:50|18/03/15 11:50:59| false|          09|         1|
    |534|18/03/15 12:00:30|18/03/15 12:35:45| false|        1815|         1|
    |534|18/03/15 12:00:50|18/03/15 12:36:45| false|        2165|         1|
    |534|18/03/15 12:37:30|18/03/15 12:37:45|  true|          15|      null|
    |534|18/03/15 12:38:00|18/03/15 12:38:10| false|          10|         2|
    |534|18/03/15 12:42:30|18/03/15 12:50:45|  true|         495|      null|
    |534|18/03/15 12:50:50|18/03/15 01:15:45| false|        1505|         4|
    |534|18/03/15 12:50:30|18/03/15 12:50:55|  true|          20|      null|
    +---+-----------------+-----------------+------+------------+----------+
0 голосов
/ 14 июня 2019

Ниже решение - просто идея, о которой я мог подумать. У меня нет IDE для тестирования и запуска. Просто пытаться помочь, кстати, мог придумать.

Допустим, у вас есть статус столбца со значениями true и false. Мы должны посчитать количество истин до первого ложного. Единственный способ, которым я могу думать о СДР:

Получить статус контакта rdd. Пусть день rddOfStatus.

rddOfStats.zipWithIndex().filter(condition).first
  1. zipWithIndex будет индексировать каждый статус как "Истинный", 0 "Истинный", 1 "Ложь", 2
  2. отфильтруйте это и получите первый элемент (tple), который является

"ложь", 2

  1. из этого кортежа мы можем извлечь значение и получить счет.

ИЛИ То же самое можно сделать с помощью Dataframe или Spark SQl.

  1. Добавить столбец с rownum для присвоения номеров каждой строке,

  2. , затем отфильтруйте все true и оставьте только false

  3. затем получите минимальный номер строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...