Сначала создайте DataFrame
df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],[ "col1","col2"])
_w = W.partitionBy("col1").orderBy("col1")
df_b = df_b.withColumn("rn", F.row_number().over(_w))
Здесь c logi, чтобы выбрать второй элемент каждой группы, если какая-либо группа имеет более одной строки. Для этого мы можем сначала присвоить номер строки каждой группе, и мы выберем первый элемент каждой группы, где количество строк равно 1, и первые 2 строки каждой группы, где количество строк больше 1 в каждой группе.
case = F.expr("""
CASE WHEN rn =1 THEN 1
WHEN rn =2 THEN 1
END""")
df_b = df_b.withColumn('case_condition', case)
df_b = df_b.filter(F.col("case_condition") == F.lit("1"))
Промежуточный вывод
+----+----------+---+--------------+
|col1| col2| rn|case_condition|
+----+----------+---+--------------+
| B|2020-08-01| 1| 1|
| B|2020-09-20| 2| 1|
| C|2020-05-10| 1| 1|
| A|2020-08-05| 1| 1|
+----+----------+---+--------------+
Теперь, наконец, просто возьмите последний элемент каждой группы -
df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
df.show()
+----+----------+
|col1| col2|
+----+----------+
| A|2020-08-05|
| B|2020-09-20|
| C|2020-05-10|
+----+----------+