первая функция агрегата scala, дающая неожиданные результаты - PullRequest
0 голосов
/ 06 февраля 2019

Я использую простой групповой запрос в scala spark, цель которого - получить первое значение в группе в отсортированном кадре данных.Вот мой искровой фрейм данных

+---------------+------------------------------------------+
|ID             |some_flag |some_type  |  Timestamp        |
+---------------+------------------------------------------+
|      656565654|      true|     Type 1|2018-08-10 00:00:00|
|      656565654|     false|     Type 1|2017-08-02 00:00:00|
|      656565654|     false|     Type 2|2016-07-30 00:00:00|
|      656565654|     false|     Type 2|2016-05-04 00:00:00|
|      656565654|     false|     Type 2|2016-04-29 00:00:00|
|      656565654|     false|     Type 2|2015-10-29 00:00:00|
|      656565654|     false|     Type 2|2015-04-29 00:00:00|
+---------------+----------+-----------+-------------------+

Вот мой совокупный запрос

val sampleDF = df.sort($"Timestamp".desc).groupBy("ID").agg(first("Timestamp"), first("some_flag"), first("some_type"))

Ожидаемый результат:

+---------------+-------------+---------+-------------------+
|ID             |some_falg    |some_type|  Timestamp        |
+---------------+-------------+---------+-------------------+
|      656565654|         true|   Type 1|2018-08-10 00:00:00|
+---------------+-------------+---------+-------------------+

Но получается следующий странный вывод, и он продолжает менятьсякак случайная строка

+---------------+-------------+---------+-------------------+
|ID             |some_falg    |some_type|  Timestamp        |
+---------------+-------------+---------+-------------------+
|      656565654|        false|   Type 2|2015-10-29 00:00:00|
+---------------+-------------+---------+-------------------+

Также обратите внимание, что в кадре данных нет нулей.Я чешу голову там, где я делаю что-то не так.Нужна помощь!

Ответы [ 2 ]

0 голосов
/ 06 февраля 2019

Просто чтобы добавить к ответу Вамси;проблема в том, что значения в группе результатов groupBy не возвращаются в каком-либо определенном порядке (особенно учитывая распределенный характер операций Spark), поэтому функция first, возможно, названа неверно.Он возвращает первое ненулевое значение, которое он находит для этого столбца , т. Е. Практически любое ненулевое значение для этого столбца в группе.

Сортировка строк до того, как groupBy нене влияет на порядок в группе каким-либо воспроизводимым способом.

См. также сообщение в блоге , в котором объясняется, что из-за описанного выше поведения значения, которые вы получаете из нескольких вызовов firstможет даже не входить в один и тот же ряд в группе.

Входные данные с 3 столбцами «k, t, v»

z, 1, null
z, 2, 1.5
z, 3, 2.4

Код:

df.groupBy("k").agg(
  $"k",
  first($"t"),
  first($"v")
)

Вывод:

z, 1, 1.5

Этот результат представляет собой смесь 2 записей!

0 голосов
/ 06 февраля 2019

То, как вы пытаетесь получить все первые значения, возвращает неверный результат.Каждое значение столбца может быть из другой строки.

Вместо этого вам нужно только order by отметка времени в порядке убывания для каждой группы и получить первую строку.Самый простой способ сделать это - использовать функцию типа row_number.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val sampleDF = df.withColumn("rnum",row_number().over(Window.partitionBy(col("ID")).orderBy(col("Timestamp").desc)))

sampleDF.filter(col("rnum") == 1).show
.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...