PySpark: создание столбца с количеством временных шагов к событию - PullRequest
0 голосов
/ 02 октября 2018

У меня есть фрейм данных, который выглядит следующим образом:

|id |val1|val2|
+---+----+----+
|1  |1   |0   |
|1  |2   |0   |
|1  |3   |0   |
|1  |4   |0   |
|1  |5   |5   |
|1  |6   |0   |
|1  |7   |0   |
|1  |8   |0   |
|1  |9   |9   |
|1  |10  |0   |
|1  |11  |0   |
|2  |1   |0   |
|2  |2   |0   |
|2  |3   |0   |
|2  |4   |0   |
|2  |5   |0   |
|2  |6   |6   |
|2  |7   |0   |
|2  |8   |8   |
|2  |9   |0   |
+---+----+----+
only showing top 20 rows

Я хочу создать новый столбец с количеством строк, пока в val2 не появится ненулевое значение, это должно быть сделано groupby / partitionby'id' ... если событие никогда не происходит, мне нужно поставить -1 в поле шагов.

|id |val1|val2|steps|
+---+----+----+----+
|1  |1   |0   |4   |
|1  |2   |0   |3   |
|1  |3   |0   |2   |
|1  |4   |0   |1   |
|1  |5   |5   |0   | event
|1  |6   |0   |3   |
|1  |7   |0   |2   |
|1  |8   |0   |1   |
|1  |9   |9   |0   | event
|1  |10  |0   |-1  | no further events for this id
|1  |11  |0   |-1  | no further events for this id
|2  |1   |0   |5   |
|2  |2   |0   |4   |
|2  |3   |0   |3   |
|2  |4   |0   |2   |
|2  |5   |0   |1   |
|2  |6   |6   |0   | event
|2  |7   |0   |1   |
|2  |8   |8   |0   | event
|2  |9   |0   |-1  | no further events for this id
+---+----+----+----+
only showing top 20 rows

1 Ответ

0 голосов
/ 02 октября 2018

Ваше требование кажется простым, но реализация в spark и с сохранением неизменности является сложной задачей.Я предлагаю вам использовать рекурсивную функцию для генерации столбца steps.Ниже я попытался предложить вам рекурсивный способ с использованием функции udf.

import org.apache.spark.sql.functions._
//udf function to populate step column
def stepsUdf = udf((values: Seq[Row]) => {

  //sorting the collected struct in reverse order according to val1 column in reverse order
  val val12 = values.sortWith(_.getAs[Int]("val1") > _.getAs[Int]("val1"))
  //selecting the first of sorted list
  val val12Head = val12.head
  //generating the first step column in the collected list
  val prevStep = if(val12Head.getAs("val2") != 0) 0 else -1
  //generating the first output struct
  val listSteps = List(steps(val12Head.getAs("val1"), val12Head.getAs("val2"), prevStep))

  //recursive function for generating the step column
  def recursiveSteps(vals : List[Row], previousStep: Int, listStep : List[steps]): List[steps] = vals match {
    case x :: y =>
          //event changed so step column should be 0
          if(x.getAs("val2") != 0) {
          recursiveSteps(y, 0, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), 0))
        }
            //event doesn't change after the last event change
          else if(x.getAs("val2") == 0 && previousStep == -1) {
          recursiveSteps(y, previousStep, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), previousStep))
        }
            //val2 is 0 after the event change so increament the step column
          else {
          recursiveSteps(y, previousStep+1, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), previousStep+1))
        }
    case Nil => listStep
  }

  //calling the recursive function
  recursiveSteps(val12.tail.toList, prevStep, listSteps)
})


df
  .groupBy("id")   // grouping by id column
  .agg(stepsUdf(collect_list(struct("val1", "val2"))).as("stepped"))  //calling udf function after the collection of struct of val1 and val2 
  .withColumn("stepped", explode(col("stepped")))   // generating rows from the list returned from udf function
  .select(col("id"), col("stepped.*"))           // final desired output
  .sort("id", "val1")     //optional step just for viewing
  .show(false)

где steps - это класс дела

case class steps(val1: Int, val2: Int, steps: Int)

, который должен дать вам

+---+----+----+-----+
|id |val1|val2|steps|
+---+----+----+-----+
|1  |1   |0   |4    |
|1  |2   |0   |3    |
|1  |3   |0   |2    |
|1  |4   |0   |1    |
|1  |5   |5   |0    |
|1  |6   |0   |3    |
|1  |7   |0   |2    |
|1  |8   |0   |1    |
|1  |9   |9   |0    |
|1  |10  |0   |-1   |
|1  |11  |0   |-1   |
|2  |1   |0   |5    |
|2  |2   |0   |4    |
|2  |3   |0   |3    |
|2  |4   |0   |2    |
|2  |5   |0   |1    |
|2  |6   |6   |0    |
|2  |7   |0   |1    |
|2  |8   |8   |0    |
|2  |9   |0   |-1   |
+---+----+----+-----+

Надеюсь, ответ полезен

...