Мне нужно рассчитать дополнительные функции из набора данных, используя несколько опережений и задержек. Большое количество опережений и задержек приводит к ошибке нехватки памяти.
Фрейм данных:
|----------+----------------+---------+---------+-----+---------|
| DeviceID | Timestamp | Sensor1 | Sensor2 | ... | Sensor9 |
|----------+----------------+---------+---------+-----+---------|
| | | | | | |
| Long | Unix timestamp | Double | Double | | Double |
| | | | | | |
|----------+----------------+---------+---------+-----+---------|
Определение окна:
// Each window contains about 600 rows
val w = Window.partitionBy("DeviceID").orderBy("Timestamp")
Вычислить дополнительные функции:
var res = df
val sensors = (1 to 9).map(i => s"Sensor$i")
for (i <- 1 to 5) {
for (s <- sensors) {
res = res.withColumn(lag(s, i).over(w))
.withColumn(lead(s, i)).over(w)
}
// Compute features from all the lag's and lead's
[...]
}
Информация о системе:
RAM: 16G
JVM heap: 11G
Код дает правильные результаты с небольшими наборами данных, но выдает ошибку нехватки памяти с 10 ГБ входных данных.
Я думаю, что виновником является большое количество оконных функций, потому что DAG показывает очень длинную последовательность
Window -> WholeStageCodeGen -> Window -> WholeStageCodeGen ...
Есть ли способ рассчитать те же функции более эффективным способом?
Например, возможно ли получить задержку (Sensor1, 1), задержку (Sensor2, 1), ..., задержку (Sensor9, 1) без вызова задержки (..., 1) девять раз?
Если ответ на предыдущий вопрос - нет, то как мне избежать нехватки памяти? Я уже пытался увеличить количество разделов.