как установить идентификатор приращения над множеством строк относительно значения col в spark - PullRequest
0 голосов
/ 27 апреля 2018

Привет, у меня есть набор данных выглядит как:

мой вклад:

+----------+----------------+
|  id      |  flag          |
+----------+----------------|
|  1       | false          |  
+----------+----------------|
|  2       | true           | 
+----------+----------------|
|  3       | false          |
+----------+----------------|
|  4       | true           |  
+----------+----------------|
|  5       | false          |
+----------+----------------|
|  6       | false          |  
+----------+----------------|
|  7       | true           |
+----------+----------------+

вывод:

+----------+----------------+----------------------------+
|  id      |  flag          |  new_col                   |
+----------+---------------------------------------------+
|  1       | false          |      1                     |
+----------+---------------------------------------------+
|  2       | true           |      1                     |
+----------+----------------+----------------------------+
|  3       | false          |      3                     |
+----------+----------------+----------------------------+
|  4       | true           |      3                     |
+----------+----------------+----------------------------+
|  5       | false          |      5                     |
+----------+----------------+----------------------------+
|  6       | false          |      6                     |
+----------+----------------+----------------------------+
|  7       | true           |      6                     |
+----------+----------------+----------------------------+

каждое ложное значение изменит значение new_col на его id и так далее ... любая помощь, пожалуйста?

Ответы [ 2 ]

0 голосов
/ 27 апреля 2018

Если вы хотите пойти по пути rdd, вы можете передать все данные одному исполнителю и выполнить цикл для , как показано ниже

df.rdd.coalesce(1).mapPartitions(iterator => {
  var y = "1"
  for (x <- iterator) yield {
    val id = x.getAs[String]("id")
    val flag = x.getAs[Boolean]("flag")
    if(flag == false){
      y = id
      newdf(id, flag, y)
    }else{
      newdf(id, flag, y)
    }
  }
}).toDF()

и для этого вам понадобится класс кейса

case class newdf(id:String, flag:Boolean, new_id:String)

Вы тоже можете обходиться без класса case , но я предпочитаю использовать case case

0 голосов
/ 27 апреля 2018

С набором данных меньшего размера вы можете сделать следующее:

  1. Используйте when - otherwise для с withColumn, чтобы создать новый столбец, который будет принимать значение id или null в зависимости от значения flag, что в SQL эквивалентно:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
  1. Затем используйте coalesce для замены всех нулей на last над окном, чтобы получить последнее ненулевое значение:
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//|  1|false|
//|  2| true|
//|  3| true|
//|  4| true|
//|  5|false|
//|  6| true|
//|  7| true|
//+---+-----+

//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window

//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")

//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values   
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
  .withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
  .show   
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1  |false|1      |
//|2  |true |1      |
//|3  |true |1      |
//|4  |true |1      |
//|5  |false|5      |
//|6  |true |5      |
//|7  |true |5      |
//+---+-----+-------+
...