Spark DataFrames: объединение двух последовательных строк - PullRequest
0 голосов
/ 23 декабря 2018

У меня есть DataFrame со следующей структурой:

|  id  |  time  |  x  |  y  |
-----------------------------
|  1   |   1    |  0  |  3  |
|  1   |   2    |  3  |  2  |
|  1   |   5    |  6  |  1  |
|  2   |   1    |  3  |  7  |
|  2   |   2    |  1  |  9  |
|  3   |   1    |  7  |  5  |
|  3   |   2    |  9  |  3  |
|  3   |   7    |  2  |  5  |
|  3   |   8    |  4  |  7  |
|  4   |   1    |  7  |  9  |
|  4   |   2    |  9  |  0  |

Я пытаюсь добиться того, чтобы для каждой записи было создано еще три столбца, содержащих time, x, y следующего (на основеtime).Уловка в том, что мы берем следующие записи, только если они имеют одинаковое значение id, в противном случае новые три столбца должны быть установлены на null

Вот вывод, который я пытаюсь получить

|  id  |  time  |  x  |  y  | time+1 | x+1 | y+1 |
--------------------------------------------------
|  1   |   1    |  0  |  3  |   2    |  3  |  2  |
|  1   |   2    |  3  |  2  |   5    |  6  |  1  |
|  1   |   5    |  6  |  1  |  null  | null| null|
|  2   |   1    |  3  |  7  |   2    |  1  |  9  |
|  2   |   2    |  1  |  9  |  null  | null| null|
|  3   |   1    |  7  |  5  |   2    |  9  |  3  |
|  3   |   2    |  9  |  3  |   7    |  2  |  5  |
|  3   |   7    |  2  |  5  |   8    |  4  |  7  |
|  3   |   8    |  4  |  7  |  null  | null| null|
|  4   |   1    |  7  |  9  |   2    |  9  |  0  |
|  4   |   2    |  9  |  0  |  null  | null| null|

Можно ли добиться этого с помощью Spark DataFrames?

Ответы [ 3 ]

0 голосов
/ 24 декабря 2018

Если вы знакомы с SQL, просто создайте временное представление и создайте все столбцы за один раз.Проверьте это

scala> val df = Seq((1,1,0,3),(1,2,3,2),(1,5,6,1),(2,1,3,7),(2,2,1,9),(3,1,7,5),(3,2,9,3),(3,7,2,5),(3,8,4,7),(4,1,7,9),(4,2,9,0)).toDF("id","time","x","y")
df: org.apache.spark.sql.DataFrame = [id: int, time: int ... 2 more fields]

scala> df.createOrReplaceTempView("m2008")

scala> spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """).show(false)
+---+----+---+---+------+----+----+
|id |time|x  |y  |timep1|xp1 |yp1 |
+---+----+---+---+------+----+----+
|1  |1   |0  |3  |2     |3   |2   |
|1  |2   |3  |2  |5     |6   |1   |
|1  |5   |6  |1  |null  |null|null|
|3  |1   |7  |5  |2     |9   |3   |
|3  |2   |9  |3  |7     |2   |5   |
|3  |7   |2  |5  |8     |4   |7   |
|3  |8   |4  |7  |null  |null|null|
|4  |1   |7  |9  |2     |9   |0   |
|4  |2   |9  |0  |null  |null|null|
|2  |1   |3  |7  |2     |1   |9   |
|2  |2   |1  |9  |null  |null|null|
+---+----+---+---+------+----+----+


scala>

Вы можете получить его как другой фрейм данных, просто присвоив результат spark.sql

scala> val df2 = spark.sql(""" select *, lead(time) over(partition by id order by time) timep1,lead(x) over(partition by id order by time) xp1, lead(y) over(partition by id order by time) yp1 from m2008 """)
df2: org.apache.spark.sql.DataFrame = [id: int, time: int ... 5 more fields]

scala> df2.printSchema
root
 |-- id: integer (nullable = false)
 |-- time: integer (nullable = false)
 |-- x: integer (nullable = false)
 |-- y: integer (nullable = false)
 |-- timep1: integer (nullable = true)
 |-- xp1: integer (nullable = true)
 |-- yp1: integer (nullable = true)


scala>
0 голосов
/ 02 января 2019

В scala вы также можете сделать следующее:

scala> import org.apache.spark.sql.expressions.Window

scala> valpart = Window.partitionBy ('id) .orderBy (' time)

scala> spark.read.format ("csv"). option ("inferSchema", "true"). option ("header"), правда) .load ("file: ///home/ec2-user/test.csv") .withColumn ("time1", опережать ('time, 1) по части) .withColumn ("x + 1", опережать('x, 1) над частью) .withColumn ("y + 1", лидерство (' y, 1) над частью) .show ()

вы также можете проверить снимок, который я посетилниже:

running snapshot of program using **windows lead function**

0 голосов
/ 23 декабря 2018

Вы можете использовать функцию окна управления.Сначала создайте окно, разделив его по столбцу id, а затем при вызове функции withColumn используйте столбец, который вы хотите отобразить со значением смещения как 1.

Примерно так:

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('id).orderBy('time)
dataset.withColumn("time1", lead('time, 1) over windowSpec).show

Вы можете добавить другиестолбцы таким же образом

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...