Если df_sd
не будет огромным списком, и у вас будет spark2.4
, вы можете сделать это, создав новый столбец в df со списком дней (1 , 2,3), а затем используйте groupBy
, collect_list
, arrays_zip
, & explode
. OrderBy перед groupBy существует, чтобы гарантировать, что список будет собран в правильном порядке.
df.show()
+----+------+----+
|Name|Number|days|
+----+------+----+
| A| 100| 1|
| A| 200| 2|
| B| 300| 1|
| B| 400| 2|
| B| 500| 3|
| C| 600| 1|
+----+------+----+
STANDARD_TENORS #-> [1, 2, 3]
#-> should be ordered
from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
.orderBy("Name","days")\
.groupBy("Name").agg(F.collect_list("Number").alias("Number")\
,F.first("days2").alias("days"))\
.withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
.select("Name","zipped.*").orderBy("Name","days").show()
+----+------+----+
|Name|Number|days|
+----+------+----+
| A| 200| 1|
| A| 100| 2|
| A| null| 3|
| B| 300| 1|
| B| 400| 2|
| B| 500| 3|
| C| 600| 1|
| C| null| 2|
| C| null| 3|
+----+------+----+
Если вы хотите использовать join
, вы можете сделать это аналогичным образом:
from pyspark.sql import functions as F
df_sd.agg(F.collect_list("days").alias("days")).join(\
df.orderBy("Name","days").groupBy("Name")\
.agg(F.collect_list("Number").alias("Number"),F.collect_list("days").alias("days1")),\
F.size("days")>=F.size("days1")).drop("days1")\
.withColumn("zipped", F.explode(F.arrays_zip("Number","days")))\
.select("Name","zipped.*")\
.orderBy("Name","days")\
.show()
UPDATE
:
Обновлено Для обработки любого ордера или для любого значения в Number
.. Я мог бы сделать код немного более краткий, но я сохранил его таким, чтобы вы могли видеть все те столбцы, которые я использовал, чтобы понять логи c. Не стесняйтесь задавать любые вопросы.
df.show()
#newsampledataframe
+----+------+----+
|Name|Number|days|
+----+------+----+
| A| 100| 1|
| A| 200| 2|
| B| 300| 1|
| B| 400| 2|
| B| 500| 3|
| C| 600| 3|
+----+------+----+
#STANDARD_TENORS = [1, 2, 3]
from pyspark.sql import functions as F
df.withColumn("days2", F.array(*[F.lit(x) for x in STANDARD_TENORS]))\
.groupBy("Name").agg(F.collect_list("Number").alias("col1")\
,F.first("days2").alias("days2"),F.collect_list("days").alias("x"))\
.withColumn("days3", F.arrays_zip(F.col("col1"),F.col("x")))\
.withColumn("days4", F.array_except("days2","x"))\
.withColumn("day5", F.expr("""transform(days4,x-> struct(bigint(-1),x))"""))\
.withColumn("days3", F.explode(F.array_union("days3","day5"))).select("Name","days3.*")\
.withColumn("Number", F.when(F.col("col1")==-1, F.lit(None)).otherwise(F.col("col1"))).drop("col1")\
.select("Name", "Number", F.col("x").alias("days"))\
.orderBy("Name","days")\
.show(truncate=False)