Проверьте код ниже.
scala>
df
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.show(false)
+--------------------------------------------------+
|data |
+--------------------------------------------------+
|[E -> [Iseult Teran, W.P.Kinsella, W. P Kinsella]]|
|[A -> [Jason Shinder, C.S Lewis]] |
+--------------------------------------------------+
Write to HDFS
df
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.write
.format("orc")
.save("<hdfs_path>")
Удаление дубликатов
Например, имена Book_Author, указанные ниже, совпадают.
Вышеупомянутые имена почти одинаковы. Код ниже удаляет дубликаты и сохраняет только W.P.Kinsella
scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._
scala> df.show(false)
+---------+-------------+
|Age_Group|Book_Author |
+---------+-------------+
|E |W.P.Kinsella |
|E |W. P Kinsella|
|E |Iseult Teran |
|A |C.S Lewis |
|A |Jason Shinder|
+---------+-------------+
scala> val windowExpr = first($"Book_Author")
.over(
Window
.partitionBy(lower(regexp_replace($"Book_Author","[ .]","")))
.orderBy($"Age_Group".asc)
)
scala> df
.withColumn("Book_Author",windowExpr)
.groupBy($"Age_Group")
.agg(collect_set($"Book_Author").as("Book_Author"))
.select(map($"Age_Group",$"Book_Author").as("data"))
.show(false)
+-----------------------------------+
|data |
+-----------------------------------+
|[E -> [Iseult Teran, W.P.Kinsella]]|
|[A -> [Jason Shinder, C.S Lewis]] |
+-----------------------------------+