Для spark2.4+
очень масштабируемый способ (without any join)
будет использовать groupBy
, collect_list
и array function
/ higher order functions
для определения отсутствующих ссылок, добавьте эти ссылки и затем explode.
. Он будет работать для любого порядка отсутствует A,B or C.
#sampledataframe
#df.show()
#+---+----+----------+
#| id|test| date|
#+---+----+----------+
#| 1| A|01/20/2020|
#| 2| A|02/20/2020|
#| 2| B|02/25/2020|
#| 2| C|02/25/2020|
#+---+----+----------+
from pyspark.sql import functions as F
df.groupBy("id").agg(F.collect_list("test").alias("x"),F.collect_list("date").alias("col2"))\
.withColumn("zip", F.arrays_zip(F.col("x"),F.col("col2")))\
.withColumn("except", F.array_except(F.array(*(F.lit(x) for x in ['A','B','C'])),"x")).drop("x","col2")\
.withColumn("except", F.expr("""transform(except,x-> struct(x,'NA'))"""))\
.withColumn("zipped", F.explode(F.array_union("zip","except")))\
.select("id",F.col("zipped.x").alias("test"),F.col("zipped.col2").alias("date"))\
.show(truncate=False)
#+---+----+----------+
#|id |test|date |
#+---+----+----------+
#|1 |A |01/20/2020|
#|1 |B |01/25/2020|
#|1 |C |NA |
#|2 |A |02/20/2020|
#|2 |B |02/25/2020|
#|2 |C |02/25/2020|
#+---+----+----------+
Physical Plan for Join Solution:
(как показано @cPak)
== Physical Plan ==
Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST, date#1339 ASC NULLS FIRST, 200), [id=#1110]
+- *(6) Project [id#1206L, test#1207, date#1339]
+- SortMergeJoin [id#1206L, test#1207], [id#1337L, test#1338], LeftOuter
:- Sort [id#1206L ASC NULLS FIRST, test#1207 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#1206L, test#1207, 200), [id=#1101]
: +- CartesianProduct
: :- Coalesce 1
: : +- *(2) HashAggregate(keys=[id#1206L], functions=[])
: : +- Exchange hashpartitioning(id#1206L, 200), [id=#1089]
: : +- *(1) HashAggregate(keys=[id#1206L], functions=[])
: : +- *(1) Project [id#1206L]
: : +- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]
: +- Coalesce 1
: +- *(4) HashAggregate(keys=[test#1207], functions=[])
: +- Exchange hashpartitioning(test#1207, 200), [id=#1095]
: +- *(3) HashAggregate(keys=[test#1207], functions=[])
: +- *(3) Project [test#1207]
: +- *(3) Scan ExistingRDD[id#1206L,test#1207,date#1208]
+- Sort [id#1337L ASC NULLS FIRST, test#1338 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1337L, test#1338, 200), [id=#1104]
+- *(5) Filter (isnotnull(id#1337L) && isnotnull(test#1338))
+- *(5) Scan ExistingRDD[id#1337L,test#1338,date#1339]
Physical Plan for Non-Join solution(using array functions+explode)
== Physical Plan ==
*(2) Project [id#1206L, zipped#1400.x AS test#1405, zipped#1400.col2 AS date#1406]
+- *(2) Generate explode(array_union(zip#1380, except#1394)), [id#1206L], false, [zipped#1400]
+- *(2) Project [id#1206L, arrays_zip(x#1374, col2#1376) AS zip#1380, transform(array_except([A,B,C], x#1374), lambdafunction(named_struct(x, lambda x#1395, col2, NA), lambda x#1395, false)) AS except#1394]
+- ObjectHashAggregate(keys=[id#1206L], functions=[collect_list(test#1207, 0, 0), collect_list(date#1208, 0, 0)])
+- Exchange hashpartitioning(id#1206L, 200), [id=#1192]
+- *(1) Project [id#1206L, date#1208, test#1207]
+- *(1) Scan ExistingRDD[id#1206L,test#1207,date#1208]
Как мы видим, решение объединения производит много обменов данными в случайном порядке и использует декартово произведение (крайне неэффективно для больших данных). ). Движение данных будет чрезвычайно высоким и обременительным для любого кластера. По сравнению с решением для массива / разнесения, перемещение данных будет намного ниже, а обработка будет намного быстрее.