группа искр и уменьшение на основе соседних рядов, а также одного ряда - PullRequest
0 голосов
/ 30 октября 2018

У меня есть данные, как показано ниже, и я хочу уменьшить их, комбинируя соседние строки, т.е. previous.close = current.open

val df = Seq(
  ("Ray","2018-09-01","2018-09-10"),
  ("Ray","2018-09-10","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-27"),
  ("Ray","2018-09-27","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-24","2018-09-28"),
  ("Scott","2018-09-28","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-15"),
  ("Scott","2018-10-15","2018-09-20")
)

Требуемый выход ниже:

  (("Ray","2018-09-01","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-24","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-20"))

Пока что я могу сжать соседние строки, используя приведенное ниже решение DF ().

df.alias("t1").join(df.alias("t2"),$"t1.name" === $"t2.name" and $"t1.close"=== $"t2.open" )
  .select("t1.name","t1.open","t2.close")
  .distinct.show(false) 

|name |open      |close     |
+-----+----------+----------+
|Scott|2018-09-24|2018-09-30|
|Scott|2018-10-11|2018-09-20|
|Ray  |2018-09-01|2018-09-15|
|Ray  |2018-09-21|2018-09-30|
+-----+----------+----------+

Я пытаюсь использовать похожий стиль для получения отдельных строк, давая $ "t1.close" =! = $ "T2.open", а затем объединяю оба, чтобы получить окончательный результат. Но я получаю нежелательные строки, которые не могу правильно отфильтровать. Как этого добиться?.

Этот пост отличается от Оконная функция Spark SQL со сложным условием , где он вычисляет дополнительный столбец даты как новый столбец.

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Вот один из подходов:

  1. Создать новый столбец temp1 со значением null, если текущий open равен предыдущему close; в противном случае значение тока open
  2. Создайте еще один столбец temp2, который будет заполнять null s в temp1 с ненулевым значением last
  3. Группировка результирующего набора данных по (name, temp2) для генерации непрерывных диапазонов дат

Я пересмотрел ваши выборочные данные, чтобы охватить случаи непрерывного диапазона дат более 2+ строк.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df = Seq(
  ("Ray","2018-09-01","2018-09-10"),
  ("Ray","2018-09-10","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-27"),
  ("Ray","2018-09-27","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-23","2018-09-28"),  // <-- Revised
  ("Scott","2018-09-28","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-15"),
  ("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")

val win = Window.partitionBy($"name").orderBy("open", "close")

val df2 = df.
  withColumn("temp1", when(
    row_number.over(win) === 1 || lag($"close", 1).over(win) =!= $"open", $"open")
  ).
  withColumn("temp2", last($"temp1", ignoreNulls=true).over(
    win.rowsBetween(Window.unboundedPreceding, 0)
  ))

df2.show
// +-----+----------+----------+----------+----------+
// | name|      open|     close|     temp1|     temp2|
// +-----+----------+----------+----------+----------+
// |Scott|2018-09-21|2018-09-23|2018-09-21|2018-09-21|
// |Scott|2018-09-23|2018-09-28|      null|2018-09-21|
// |Scott|2018-09-28|2018-09-30|      null|2018-09-21|
// |Scott|2018-10-05|2018-10-09|2018-10-05|2018-10-05|
// |Scott|2018-10-11|2018-10-15|2018-10-11|2018-10-11|
// |Scott|2018-10-15|2018-10-20|      null|2018-10-11|
// |  Ray|2018-09-01|2018-09-10|2018-09-01|2018-09-01|
// |  Ray|2018-09-10|2018-09-15|      null|2018-09-01|
// |  Ray|2018-09-16|2018-09-18|2018-09-16|2018-09-16|
// |  Ray|2018-09-21|2018-09-27|2018-09-21|2018-09-21|
// |  Ray|2018-09-27|2018-09-30|      null|2018-09-21|
// +-----+----------+----------+----------+----------+

Выше показан результат шага 1 и 2, в котором temp2 содержит значение самого раннего open соответствующего непрерывного диапазона дат. Шаг 3 использует max, чтобы получить самое позднее close диапазона дат:

df2.
  groupBy($"name", $"temp2".as("open")).agg(max($"close").as("close")).
  show
// +-----+----------+----------+
// |name |open      |close     |
// +-----+----------+----------+
// |Scott|2018-09-21|2018-09-30|
// |Scott|2018-10-05|2018-10-09|
// |Scott|2018-10-11|2018-10-20|
// |Ray  |2018-09-01|2018-09-15|
// |Ray  |2018-09-16|2018-09-18|
// |Ray  |2018-09-21|2018-09-30|
// +-----+----------+----------+
0 голосов
/ 30 октября 2018

ОБНОВЛЕНО: код проверен: -)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}

val df = Seq(
  ("Ray","2018-09-01","2018-09-10"),
  ("Ray","2018-09-10","2018-09-15"),
  ("Ray","2018-09-16","2018-09-18"),
  ("Ray","2018-09-21","2018-09-27"),
  ("Ray","2018-09-27","2018-09-30"),
  ("Scott","2018-09-21","2018-09-23"),
  ("Scott","2018-09-23","2018-09-28"),  // <-- Revised
  ("Scott","2018-09-28","2018-09-30"),
  ("Scott","2018-10-05","2018-10-09"),
  ("Scott","2018-10-11","2018-10-15"),
  ("Scott","2018-10-15","2018-10-20")
).toDF("name", "open", "close")

val window = Window.partitionBy("name").orderBy($"open").rowsBetween(-1, Window.currentRow) //<- only compare the dates of a certain name, and for each row look also look at the previous one

df.select(
  $"name", $"open", $"close",
  min($"close").over(window) as "closeBefore_tmp"//<- get the smaller close value (that of the previous entry) 
)
.withColumn("closeBefore", when($"closeBefore_tmp" === $"close", null).otherwise($"closeBefore_tmp")) //<- in this case there was no previous row: its the first for this user, so set closeBefore to null
.createOrReplaceTempView("tmp")

Теперь вы можете compare открыть и closeBefore.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...