Spark: сложная работа с датафреймами - PullRequest
0 голосов
/ 18 февраля 2019

У меня есть входной набор данных в следующем формате:

+---+--------+----------+
| id|   refId| timestamp|
+---+--------+----------+
|  1|    null|1548944642|
|  1|29950529|1548937685|
|  2|27510720|1548944885|
|  2|27510720|1548943617|
+---+--------+----------+

Необходимо добавить новый столбец session со следующей логикой преобразования:

  1. Если refId is null значение сеанса истинно.
  2. Если id and refId are unique, значение сеанса равно true.
  3. Если id and refId are not unique и `timestamp больше, чем в предыдущей строке, значение сеанса равно true.Кроме того, разница между временными метками должна быть> 60.
+---+--------+-------+----------+
| id|   refId|session| timestamp|
+---+--------+-------+----------+
|  1|    null|   true|1548944642|
|  1|29950529|   true|1548937685|
|  2|27510720|  false|1548943617|
|  2|27510720|   true|1548944885|
+---+--------+-------+----------+

Я могу выполнять условия 1 и 3 отдельно, но не 2-е.

  1. `data.withColumn ("session", functions.when (data.col ("refId"). isNull (), true));
WindowSpec w = Window.partitionBy("id, refid").orderBy(timestampDS.col("timestamp"));
functions.coalesce(timestampDS.col("timestamp").cast("long").$minus(functions.lag("timestamp", 1).over(w).cast("long")), functions.lit(0));

Мой вопрос заключается в том, как выполнить 2-е условиеи реализовать все 3 преобразования вместе.

Ответы [ 2 ]

0 голосов
/ 18 февраля 2019

Вы можете использовать оконную функцию, чтобы идентифицировать groupBy и rfId и упорядочить по метке времени, а затем добавить столбец ранга.Наконец, вы добавляете столбец сеанса с помощью когда, иначе функция sql.

import org.apache.spark.sql.expressions.{Window}
import org.apache.spark.sql.functions.{when, col, rank, lit, lag}
val win = Window.partitionBy("id", "refId").orderBy("timestamp")
val result = df
      .withColumn("previous", lag("timestamp", 1) over win)
      .withColumn("rank", rank() over win)
      .withColumn("session",
        when(col("refId").isNull || col("rank") === lit(1), true)
          .otherwise(false)
      )
      .withColumn("diff", col("timestamp") - col("previous"))
0 голосов
/ 18 февраля 2019

Я бы сказал, используйте Spark SQL для меньшей сложности и легко достигните своего результата

df.createOrReplaceTempView("test")

spark.sql("select id,refId,timestamp,case when refId is null and id is not null then 'true' when id is not null and refId is not null and rank=1 then 'true' else 'false' end as session from  (select id,refId,timestamp, rank() OVER (PARTITION BY id,refId ORDER BY timestamp DESC) as rank from test) c").show()

Вывод выглядит так:

+---+--------+----------+-------+
| id|   refId| timestamp|session|
+---+--------+----------+-------+
|  1|    null|1548944642|   true|
|  1|29950529|1548937685|   true|
|  2|27510720|1548944885|   true|
|  2|27510720|1548943617|  false|
+---+--------+----------+-------+ 
...