Как объединить список в один список в pyspark - PullRequest
0 голосов
/ 30 августа 2018

В фрейме данных spark у меня есть 1 столбец, содержащий список в виде строк. Я хочу объединить список строк в одну.

INPUT DATAFRAME:
+-------+--------------------+
| name  |friends             |
+-------+--------------------+
| Jim   |[["C","A"]["B","C"]]|
+-------+--------------------+
| Bill  |[["E","A"]["F","L"]]|
+-------+--------------------+
| Kim   |[["C","K"]["L","G"]]| 
+-------+--------------------+

OUTPUT DATAFRAME:  

+-------+--------------------+
| name  |friends             |
+-------+--------------------+
| Jim   |["C","A","B"]       |
+-------+--------------------+
| Bill  |["E","A","F","L"]   |
+-------+--------------------+
| Kim   |["C","K","L","G"]   | 
+-------+--------------------+

Я хочу объединить список в один список и удалить дубликаты. Заранее спасибо

1 Ответ

0 голосов
/ 30 августа 2018

Я думаю, вы можете положиться на комбинацию explode для деконструкции списков и collect_set для их перестройки:

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

columns = ['name', 'friends']
data = [("Jim", [["C","A"], ["B","C"]]), ("Bill", [["E","A"], ["F","L"]]), ("Kim", [["C","K"], ["L","G"]])]
pd_data = pd.DataFrame.from_records(data=data, columns=columns)
spark_data = spark.createDataFrame(pd_data)

first_explode = spark_data.withColumn("first_explode", F.explode((F.col("friends"))))
first_explode.show()

+----+----------------+-------------+
|name|         friends|first_explode|
+----+----------------+-------------+
| Jim|[[C, A], [B, C]]|       [C, A]|
| Jim|[[C, A], [B, C]]|       [B, C]|
|Bill|[[E, A], [F, L]]|       [E, A]|
|Bill|[[E, A], [F, L]]|       [F, L]|
| Kim|[[C, K], [L, G]]|       [C, K]|
| Kim|[[C, K], [L, G]]|       [L, G]|
+----+----------------+-------------+

Первый уровень деконструирован. Теперь о втором:

second_explode = first_explode.withColumn("second_explode", F.explode(F.col("first_explode")))
second_explode.show()

+----+----------------+-------------+--------------+
|name|         friends|first_explode|second_explode|
+----+----------------+-------------+--------------+
| Jim|[[C, A], [B, C]]|       [C, A]|             C|
| Jim|[[C, A], [B, C]]|       [C, A]|             A|
| Jim|[[C, A], [B, C]]|       [B, C]|             B|
| Jim|[[C, A], [B, C]]|       [B, C]|             C|
|Bill|[[E, A], [F, L]]|       [E, A]|             E|
|Bill|[[E, A], [F, L]]|       [E, A]|             A|
|Bill|[[E, A], [F, L]]|       [F, L]|             F|
|Bill|[[E, A], [F, L]]|       [F, L]|             L|
| Kim|[[C, K], [L, G]]|       [C, K]|             C|
| Kim|[[C, K], [L, G]]|       [C, K]|             K|
| Kim|[[C, K], [L, G]]|       [L, G]|             L|
| Kim|[[C, K], [L, G]]|       [L, G]|             G|
+----+----------------+-------------+--------------+

Восстановить список, исключив дубликаты:

grouped = second_explode.groupBy("name").agg(F.collect_set(F.col("second_explode")).alias("friends"))
grouped.show()

+----+------------+
|name|     friends|
+----+------------+
| Jim|   [C, B, A]|
|Bill|[F, E, A, L]|
| Kim|[K, C, G, L]|
+----+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...