Spark SQL: Агрегировать с временным окном - PullRequest
0 голосов
/ 18 мая 2018

У меня есть данные, упорядоченные по метке времени, со следующей структурой:

+------------+--------+--------+----------+-------+
| timestamp  |  value | device | subgroup | group |
+------------+--------+--------+----------+-------+
| 1377986440 |      0 |      1 |        0 |     5 |
| 1377986440 |   2.25 |      1 |        0 |     5 |
| 1377986440 |      0 |      2 |        0 |     6 |
| 1377986440 |  0.135 |      0 |        0 |     6 |
| 1377986440 |  0.355 |      0 |        0 |     6 |
+------------+--------+--------+----------+-------+

Я уже изменил метку времени с Long на TimestampType, и записи идут с 1 сентября 2013 года по 30 сентября 2013 года.

Мне нужно вычислить среднее и стандартное отклонение для значения по всему набору данных в следующих временных окнах: [00:00, 6:00), [06: 00,12: 00), [12:00, 18:00), [18: 00,00: 00) и для каждой группы.Например, вывод должен быть таким:

+-------+--------------+------+-------+
| group |   timeSlot   |  avg |  std  |
+-------+--------------+------+-------+
|     0 | 00:00 6:00   |  1.4 |  0.25 |
|     0 | 06:00 12:00  |  2.4 |  0.25 |
|   ... | ...          | .... |  .... |
|     3 | 00:00 6:00   |  2.3 |   0.1 |
|     3 | 06:00 12:00  |  0.0 |   0.0 |
|   ... | ...          |  ... |   ... |
+-------+--------------+------+-------+

Я попытался использовать окно, как объяснено здесь , поэтому я преобразовал свою метку времени Unix в TimestampType с форматом ЧЧ: мм: сс,Тогда мой код:

val data = df
  .select("*")
  .withColumn("timestamp", from_unixtime($"timestamp", "HH:mm:ss"))

 val res = data.select("*")
  .groupBy($"group", window($"timestamp", "6 hours", "6 hours"))
  .agg(avg("value"), stddev("value"))
  .orderBy("group")

Однако первый временной интервал начинается не с 00:00, а с 02:00:00, даже если я не указал начальную точку для окна.Вывод, который я получаю:

+--------+------------------------------------------+---------------------+---------------------+
|group   |window                                    |avg(cons)            |stddev_samp(cons)    |
+--------+------------------------------------------+---------------------+---------------------+
|0       |[2018-05-18 02:00:00, 2018-05-18 08:00:00]|1.781448366186445E-4 |0.004346229072242386 |
|0       |[2018-05-18 14:00:00, 2018-05-18 20:00:00]|0.0045980360360061865|0.7113464184007158   |
|0       |[2018-05-18 20:00:00, 2018-05-19 02:00:00]|2.7686190915763437E-4|6.490469208721791E-4 |
|0       |[2018-05-17 20:00:00, 2018-05-18 02:00:00]|0.0016399597206953798|0.12325297254169619  |
|0       |[2018-05-18 08:00:00, 2018-05-18 14:00:00]|2.3354306613988956E-4|5.121337883543223E-4 |
|1       |[2018-05-18 20:00:00, 2018-05-19 02:00:00]|8.319111249637333E-4 |0.00163300686441327  |
|1       |[2018-05-18 14:00:00, 2018-05-18 20:00:00]|0.006463708881068344 |0.7907138759032012   |
|1       |[2018-05-18 02:00:00, 2018-05-18 08:00:00]|6.540241054052753E-4 |0.020490123866864617 |

Как мне изменить мой код?Я пробовал другие решения, но ни одно не работает

1 Ответ

0 голосов
/ 18 мая 2018

Возможно, вы неправильно настроили настройки часового пояса.Двухчасовая смена предполагает, что вы используете GMT ​​+ 2 или эквивалент.

Если вы используете Spark 2.3 (или более позднюю версию), вы можете просто установить часовой пояс в своем коде (или конфигурации):

spark.conf.set("spark.sql.session.timeZone", "UTC")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...