У меня есть фрейм данных с тремя столбцами: id
, index
и value
.
+---+-----+-------------------+
| id|index| value|
+---+-----+-------------------+
| A| 1023|0.09938822262205915|
| A| 1046| 0.3110047630613805|
| A| 1069| 0.8486710971453512|
+---+-----+-------------------+
root
|-- id: string (nullable = true)
|-- index: integer (nullable = false)
|-- value: double (nullable = false)
Затем у меня есть еще один фрейм данных, который показывает желательные периоды для каждого id
:
+---+-----------+---------+
| id|start_index|end_index|
+---+-----------+---------+
| A| 1069| 1276|
| B| 2066| 2291|
| B| 1616| 1841|
| C| 3716| 3932|
+---+-----------+---------+
root
|-- id: string (nullable = true)
|-- start_index: integer (nullable = false)
|-- end_index: integer (nullable = false)
У меня есть три шаблона, как показано ниже
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
Цель состоит в том, чтобы для каждой строки в dfIntervals
применить функцию (предположим, что это корреляция), в которой функция получает value
столбец из dfRaw
и три массива шаблонов и добавляет три столбца к dfIntervals
, каждый столбец связан с каждым шаблоном.
Допущения: 1 - Размеры массивов шаблонов составляют ровно 10.
2 - В столбце index
столбцов dfRaw
3 - start_index
и end_index
столбцов в dfIntervals
отсутствуют столбцы index
, а в столбце dfRaw
и при наличиировно 10 рядов между ними.Например, dfRaw.filter($"id" === "A").filter($"index" >= 1069 && $"index" <= 1276).count
(первая строка в dfIntervals) дает ровно 10
.
Вот код, который генерирует эти кадры данных:
import org.apache.spark.sql.functions._
val mySeed = 1000
/* Defining templates for correlation analysis*/
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
/* Defining raw data*/
var dfRaw = Seq(
("A", (1023 to 1603 by 23).toArray),
("B", (341 to 2300 by 25).toArray),
("C", (2756 to 3954 by 24).toArray)
).toDF("id", "index")
dfRaw = dfRaw.select($"id", explode($"index") as "index").withColumn("value", rand(seed=mySeed))
/* Defining intervals*/
var dfIntervals = Seq(
("A", 1069, 1276),
("B", 2066, 2291),
("B", 1616, 1841),
("C", 3716, 3932)
).toDF("id", "start_index", "end_index")
В результате три столбца добавляются к dfIntervals
датафрейм с именами corr_w_template1
, corr_w_template2
и corr_w_template3
PS: я не смог найти корреляционную функцию в Scala.Давайте предположим, что такая функция существует (как показано ниже), и мы собираемся сделать из нее udf
.
def correlation(arr1: Array[Double], arr2: Array[Double]): Double