Объединить несколько рядов искр в один - PullRequest
0 голосов
/ 31 января 2020

У меня есть датафрейм, который выглядит как приведенный ниже. Все значения для соответствующего id одинаковы, за исключением поля mappingcol.

+--------------------+----------------+--------------------+-------+
|misc                |fruit           |mappingcol          |id     |
+--------------------+----------------+--------------------+-------+
|ddd                 |apple           |Map("name"->"Sameer"|     1 |
|ref                 |banana          |Map("name"->"Riyazi"|     2 |
|ref                 |banana          |Map("lname"->"Nikki"|     2 |
|ddd                 |apple           |Map("lname"->"tenka"|     1 |
+--------------------+----------------+--------------------+-------+

Я хочу объединить строки с одной и той же строкой так, чтобы я получил ровно одну строку на одну id, и значение mappingcol необходимо объединить. Вывод должен выглядеть следующим образом:

+--------------------+----------------+--------------------+-------+
|misc                |fruit           |mappingcol          |id     |
+--------------------+----------------+--------------------+-------+
|ddd                 |apple           |Map("name"->"Sameer"|     1 |
|ref                 |banana          |Map("name"->"Riyazi"|     2 |
+--------------------+----------------+--------------------+-------+

значение для mappingcol для id = 1 будет:

Map(
"name" -> "Sameer",
"lname" -> "tenka"
)

Я знаю, что карты можно объединить с помощью ++ оператор, так что это не то, что меня беспокоит. Я просто не могу понять, как объединить строки, потому что, если я использую groupBy, мне нечего агрегировать строки.

Ответы [ 2 ]

3 голосов
/ 31 января 2020

Вы можете использовать groupBy, а затем немного управлять картой

df.groupBy("id", "fruit", "misc").agg(collect_list("mappingcol"))
.as[(Int, String, String, Seq[Map[String, String]])]
.map { case (id, fruit, misc, list) => (id, fruit, misc, list.reduce(_ ++ _)) }
.toDF("id", "fruit", "misc", "mappingColumn")
  • В первой строке выберите группу по нужным столбцам и объедините пары карт в одном элементе (массиве). )
  • Со второй строкой (as) вы преобразуете свою структуру в набор данных Tuple4 с последним элементом, представляющим собой последовательность карт
  • С третьей строкой (map) вы объединяете все элементы на одной карте
  • С последней строкой (toDF) для присвоения столбцам исходных имен

ВЫХОД

+---+------+----+--------------------------------+
|id |fruit |misc|mappingColumn                   |
+---+------+----+--------------------------------+
|1  |apple |ddd |[name -> Sameer, lname -> tenka]|
|2  |banana|ref |[name -> Riyazi, lname -> Nikki]|
+---+------+----+--------------------------------+
1 голос
/ 01 апреля 2020

Вы определенно можете сделать вышеописанное с помощью функции Window!

Это в PySpark, а не Scala, но почти нет разницы, когда используются только собственные функции Spark.

Приведенный ниже код работает только для столбца карты, который содержит 1 ключ, пару значений на строку, как это показано в примере данных, но его можно настроить для работы со столбцами карты с несколькими записями.

from pyspark.sql import Window

map_col = 'mappingColumn'
group_cols = ['id', 'fruit', 'misc']
# or, a lazier way if you have a lot of columns to group on
cols = df.columns # save as list
group_cols_2 = cols.remove('mappingCol') # remove what you're not grouping by

w = Window.partitionBy(group_cols)

# unpack map value and key into a pair struct column
df1 = df.withColumn(map_col , F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0]))

# Collect all key values into an array of structs, here each row
# contains the map entries for all rows in the group/window
df1 = df1.withColumn(map_col , F.collect_list(map_col).over(w))

# drop duplicate values, as you only want one row per group
df1 = df1.dropDuplicates(group_cols)

# return the values for map type
df1 = df1.withColumn(map_col , F.map_from_entries(map_col))

Вы можете сохранить выходные данные каждого шага в новом столбце, чтобы увидеть, как работает каждый шаг, как я сделал ниже.

from pyspark.sql import Window

map_col = 'mappingColumn'
group_cols = list('id', 'fruit', 'misc')
w = Window.partitionBy(group_cols)

df1 = df.withColumn('test', F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0]))
df1 = df1.withColumn('test1', F.collect_list('test').over(w))
df1 = df1.withColumn('test2', F.map_from_entries('test1'))

df1.show(truncate=False)
df1.printSchema()

df1 = df1.dropDuplicates(group_cols)

...