SparkSQL: как суммировать два набора данных временных рядов с разными временными метками - PullRequest
0 голосов
/ 13 октября 2018

У меня есть два набора данных данных временных рядов. Мне нужно суммировать эти два набора данных, используя, вероятно, какой-то подход оконного подхода Временные метки различны для двух наборов данных Результатом будетсумма полей «значения» из обоих наборов данных, которая попадает в окно результирующего набора данных.

Есть ли в Spark встроенные функции, позволяющие сделать это легко? Или как мне добиться этого наилучшим образом

DataSet-1 
 raw_data_field_id | date_time_epoch | value
-------------------+-----------------+-----------
                23 |   1528766100068 |       131
                23 |   1528765200058 | 130.60001
                23 |   1528764300049 |     130.3
                23 |   1528763400063 |       130
                23 |   1528762500059 | 129.60001
                23 |   1528761600050 |     129.3
                23 |   1528760700051 | 128.89999
                23 |   1528759800047 | 128.60001

DataSet-2
 raw_data_field_id | date_time_epoch | value
-------------------+-----------------+-----------
                24 |   1528766100000 |       41
                24 |   1528765200000 |       60
                24 |   1528764300000 |       30.03
                24 |   1528763400000 |       43
                24 |   1528762500000 |       34.01
                24 |   1528761600000 |       29.36
                24 |   1528760700000 |       48.99
                24 |   1528759800000 |       28.01

1 Ответ

0 голосов
/ 13 октября 2018

Ее пример

scala> d1.show
+-----------------+--------------------+---------+
|raw_data_field_id|     date_time_epoch|    value|
+-----------------+--------------------+---------+
|               23|2018-06-12 01:15:...|    131.0|
|               23|2018-06-12 01:00:...|130.60001|
|               23|2018-06-12 00:45:...|    130.3|
|               23|2018-06-12 00:30:...|    130.0|
|               23|2018-06-12 00:15:...|129.60001|
|               23|2018-06-12 00:00:...|    129.3|
|               23|2018-06-11 23:45:...|128.89999|
|               23|2018-06-11 23:30:...|128.60001|
+-----------------+--------------------+---------+


scala> d2.show
+-----------------+--------------------+-----+
|raw_data_field_id|     date_time_epoch|value|
+-----------------+--------------------+-----+
|               24|2018-06-12 01:15:...| 41.0|
|               24|2018-06-12 01:00:...| 60.0|
|               24|2018-06-12 00:45:...|30.03|
|               24|2018-06-12 00:30:...| 43.0|
|               24|2018-06-12 00:15:...|34.01|
|               24|2018-06-12 00:00:...|29.36|
|               24|2018-06-11 23:45:...|48.99|
|               24|2018-06-11 23:30:...|28.01|
+-----------------+--------------------+-----+
scala> d1.unionAll(d2).show
+-----------------+--------------------+---------+
|raw_data_field_id|     date_time_epoch|    value|
+-----------------+--------------------+---------+
|               23|2018-06-12 01:15:...|    131.0|
|               23|2018-06-12 01:00:...|130.60001|
|               23|2018-06-12 00:45:...|    130.3|
|               23|2018-06-12 00:30:...|    130.0|
|               23|2018-06-12 00:15:...|129.60001|
|               23|2018-06-12 00:00:...|    129.3|
|               23|2018-06-11 23:45:...|128.89999|
|               23|2018-06-11 23:30:...|128.60001|
|               24|2018-06-12 01:15:...|     41.0|
|               24|2018-06-12 01:00:...|     60.0|
|               24|2018-06-12 00:45:...|    30.03|
|               24|2018-06-12 00:30:...|     43.0|
|               24|2018-06-12 00:15:...|    34.01|
|               24|2018-06-12 00:00:...|    29.36|
|               24|2018-06-11 23:45:...|    48.99|
|               24|2018-06-11 23:30:...|    28.01|
+-----------------+--------------------+---------+
import org.apache.spark.sql.functions.window
val df = d1.union(d2)
val avg_df = df.groupBy(window($"date_time_epoch", "15 minutes")).agg(avg($"value"))
avg_df.show
+--------------------+-----------------+
|              window|       avg(value)|
+--------------------+-----------------+
|[2018-06-11 23:45...|        88.944995|
|[2018-06-12 00:30...|             86.5|
|[2018-06-12 01:15...|             86.0|
|[2018-06-11 23:30...|        78.305005|
|[2018-06-12 00:00...|79.33000000000001|
|[2018-06-12 00:45...|           80.165|
|[2018-06-12 00:15...|        81.805005|
|[2018-06-12 01:00...|        95.300005|
+--------------------+-----------------+
avg_df.sort("window.start").select("window.start","window.end","avg(value)").show(truncate = false)
+-------------------+-------------------+-----------------+
|start              |end                |avg(value)       |
+-------------------+-------------------+-----------------+
|2018-06-11 23:30:00|2018-06-11 23:45:00|78.305005        |
|2018-06-11 23:45:00|2018-06-12 00:00:00|88.944995        |
|2018-06-12 00:00:00|2018-06-12 00:15:00|79.33000000000001|
|2018-06-12 00:15:00|2018-06-12 00:30:00|81.805005        |
|2018-06-12 00:30:00|2018-06-12 00:45:00|86.5             |
|2018-06-12 00:45:00|2018-06-12 01:00:00|80.165           |
|2018-06-12 01:00:00|2018-06-12 01:15:00|95.300005        |
|2018-06-12 01:15:00|2018-06-12 01:30:00|86.0             |
+-------------------+-------------------+-----------------+
...