Для этого сгруппируйте по "id", затем соберите списки из обоих "col1" и "col2" в агрегации, чтобы затем снова разложить его в один столбец. Чтобы получить уникальные числа, просто отбросьте дубликаты после.
Я вижу, что у вас также есть числа, отсортированные в конечном результате, это делается путем сортировки составных списков в агрегации.
Следующий код:
from pyspark.sql.functions import concat, collect_list, explode, col, sort_array
df = (
sc.parallelize([
('A', 2, 3), ('A', 2, 4), ('A', 4, 6),
('B', 1, 2),
]).toDF(["id", "col1", "col2"])
)
result = df.groupBy("id") \
.agg(sort_array(concat(collect_list("col1"),collect_list("col2"))).alias("all_numbers")) \
.orderBy("id") \
.withColumn('number', explode(col('all_numbers'))) \
.dropDuplicates() \
.select("id","number") \
.show()
даст:
+---+------+
| id|number|
+---+------+
| A| 2|
| A| 3|
| A| 4|
| A| 6|
| B| 1|
| B| 2|
+---+------+