У меня есть фрейм данных со сложной структурой. Внизу внутри этой структуры мне нужно заменить значение другим на основе сопоставления из другого фрейма данных. В настоящее время мы достигли sh этого, взорвав фрейм данных, присоединившись, а затем выполнив группирование с агрегацией. Проблема в том, что мы превращаем записи 3.5B в записи 210B. Стоимость группировки очень высока. И я начинаю с данных, которые уже сгруппированы, как я хочу. Был ли какой-то способ выполнить это, не взорвавшись и не сгруппировавшись?
Вот пример кода из блокнота Zeppelin для иллюстрации нашего текущего метода:
import spark.implicits._
case class A(device_id: Long, cluster: Seq[B])
case class B(location_id: Long, score: Double)
case class C(location_id: Long, location_key: String)
case class D(location_key: String, score: Double)
val df1 = Seq(
A(1L, Seq(B(1L, 1.1), B(2L, 2.2), B(3L, 3.3))),
A(2L, Seq(B(4L, 4.4), B(5L, 5.5), B(6L, 6.6))),
A(3L, Seq(B(7L, 7.7), B(8L, 8.8), B(9L, 9.9)))
).toDF
val df2 = Seq(
C(1L, "a"),
C(2L, "b"),
C(3L, "c"),
C(4L, "d"),
C(5L, "e"),
C(6L, "f"),
C(7L, "g"),
C(8L, "h"),
C(9L, "i")
).toDF
val df3 = df1
.select($"device_id", explode($"cluster").as("record"))
.select($"device_id", $"record.location_id".as("location_id"), $"record.score".as("score"))
val df4 = df3
.join(df2, "location_id")
.select($"device_id", $"location_key", $"score")
val df5 = df4
.groupBy($"device_id")
.agg(
collect_list(struct($"location_key", $"score")).as("cluster")
)
df1.printSchema()
df1.show(3, false)
df5.printSchema()
df5.show(3, false)
Вывод выглядит так:
root
|-- device_id: long (nullable = false)
|-- cluster: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- location_id: long (nullable = false)
| | |-- score: double (nullable = false)
+---------+------------------------------+
|device_id|cluster |
+---------+------------------------------+
|1 |[[1, 1.1], [2, 2.2], [3, 3.3]]|
|2 |[[4, 4.4], [5, 5.5], [6, 6.6]]|
|3 |[[7, 7.7], [8, 8.8], [9, 9.9]]|
+---------+------------------------------+
root
|-- device_id: long (nullable = false)
|-- cluster: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- location_key: string (nullable = true)
| | |-- score: double (nullable = true)
+---------+------------------------------+
|device_id|cluster |
+---------+------------------------------+
|1 |[[a, 1.1], [c, 3.3], [b, 2.2]]|
|2 |[[e, 5.5], [d, 4.4], [f, 6.6]]|
|3 |[[g, 7.7], [h, 8.8], [i, 9.9]]|
+---------+------------------------------+