Как объединить два столбца в один в PySpark? - PullRequest
2 голосов
/ 10 января 2020

У меня есть следующий PySpark DataFrame:

id   col1   col2
A    2      3
A    2      4
A    4      6
B    1      2

Я хочу составить col1 и col2, чтобы получить один столбец следующим образом:

id   col3
A    2   
A    3
A    4
A    6
B    1
B    2

Как я могу сделать это?

df = (
    sc.parallelize([
        (A, 2, 3), (A, 2, 4), (A, 4, 6),
        (B, 1, 2),
    ]).toDF(["id", "col1", "col2"])
)

Ответы [ 3 ]

2 голосов
/ 10 января 2020

Самое простое - объединить col1 и col2 в столбец массива, а затем explode it:

df.show()
+---+----+----+
| id|col1|col2|
+---+----+----+
|  A|   2|   3|
|  A|   2|   4|
|  A|   4|   6|
|  B|   1|   2|
+---+----+----+

df.selectExpr('id', 'explode(array(col1, col2))').show()
+---+---+
| id|col|
+---+---+
|  A|  2|
|  A|  3|
|  A|  2|
|  A|  4|
|  A|  4|
|  A|  6|
|  B|  1|
|  B|  2|
+---+---+

Вы можете удалить дубликаты, если они вам не нужны.

1 голос
/ 10 января 2020

Скорее простое решение, если количество задействованных столбцов меньше.

df = (
    sc.parallelize([
        ('A', 2, 3), ('A', 2, 4), ('A', 4, 6),
        ('B', 1, 2),
    ]).toDF(["id", "col1", "col2"])
)


df.show()

+---+----+----+
| id|col1|col2|
+---+----+----+
|  A|   2|   3|
|  A|   2|   4|
|  A|   4|   6|
|  B|   1|   2|
+---+----+----+

df1 = df.select(['id', 'col1'])
df2 = df.select(['id', 'col2']).withColumnRenamed('col2', 'col1')

df_new = df1.union(df2)
df_new = df_new.drop_duplicates()
df_new.show()

+---+----+
| id|col1|
+---+----+
|  A|   3|
|  A|   4|
|  B|   1|
|  A|   6|
|  A|   2|
|  B|   2|
+---+----+
1 голос
/ 10 января 2020

Для этого сгруппируйте по "id", затем соберите списки из обоих "col1" и "col2" в агрегации, чтобы затем снова разложить его в один столбец. Чтобы получить уникальные числа, просто отбросьте дубликаты после.

Я вижу, что у вас также есть числа, отсортированные в конечном результате, это делается путем сортировки составных списков в агрегации.

Следующий код:

from pyspark.sql.functions import concat, collect_list, explode, col, sort_array

df = (
    sc.parallelize([
        ('A', 2, 3), ('A', 2, 4), ('A', 4, 6),
        ('B', 1, 2),
    ]).toDF(["id", "col1", "col2"])
)

result = df.groupBy("id") \
.agg(sort_array(concat(collect_list("col1"),collect_list("col2"))).alias("all_numbers")) \
.orderBy("id") \
.withColumn('number', explode(col('all_numbers'))) \
.dropDuplicates() \
.select("id","number") \
.show()

даст:

+---+------+
| id|number|
+---+------+
|  A|     2|
|  A|     3|
|  A|     4|
|  A|     6|
|  B|     1|
|  B|     2|
+---+------+
...