Pyspark создать словарь в группе - PullRequest
1 голос
/ 23 марта 2019

Возможно ли в pyspark создать словарь в пределах groupBy.agg()?Вот игрушечный пример:

import pyspark
from pyspark.sql import Row
import pyspark.sql.functions as F

sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

toy_data = spark.createDataFrame([
    Row(id=1, key='a', value="123"),
    Row(id=1, key='b', value="234"),
    Row(id=1, key='c', value="345"),
    Row(id=2, key='a', value="12"),
    Row(id=2, key='x', value="23"),
    Row(id=2, key='y', value="123")])

toy_data.show()

+---+---+-----+
| id|key|value|
+---+---+-----+
|  1|  a|  123|
|  1|  b|  234|
|  1|  c|  345|
|  2|  a|   12|
|  2|  x|   23|
|  2|  y|  123|
+---+---+-----+

, и это ожидаемый результат :

---+------------------------------------
id |  key_value
---+------------------------------------
1  | {"a": "123", "b": "234", "c": "345"}
2  | {"a": "12", "x": "23", "y": "123"}
---+------------------------------------

======================================

Я пробовал это, но не работает.

toy_data.groupBy("id").agg(
    F.create_map(col("key"),col("value")).alias("key_value")
)

Этовыдает следующую ошибку:

AnalysisException: u"expression '`key`' is neither present in the group by, nor is it an aggregate function....

Ответы [ 2 ]

3 голосов
/ 23 марта 2019

Компонент agg должен содержать фактическую функцию агрегирования.Одним из способов решения этой проблемы является объединение collect_list

Функция агрегирования: возвращает список объектов с дубликатами.

struct:

Создает новый столбец структуры.

и map_from_entries

КоллекцияФункция: Возвращает карту, созданную из данного массива записей.

Вот как вы это сделаете:

toy_data.groupBy("id").agg(
    F.map_from_entries(
        F.collect_list(
            F.struct("key", "value"))).alias("key_value")
).show(truncate=False)
+---+------------------------------+
|id |key_value                     |
+---+------------------------------+
|1  |[a -> 123, b -> 234, c -> 345]|
|2  |[a -> 12, x -> 23, y -> 123]  |
+---+------------------------------+
0 голосов
/ 27 июня 2019

Для pyspark <2.4.0, где <code>pyspark.sql.functions.map_from_entries недоступен, вы можете использовать собственную созданную функцию udf

import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

@F.udf(returnType=MapType(StringType(), StringType()))
def map_array(column):
    return dict(column)

(toy_data.groupBy("id")
     .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
     .withColumn('key_value', map_array('key_value'))
     .show(truncate=False))
+---+------------------------------+
|id |key_value                     |
+---+------------------------------+
|1  |[a -> 123, b -> 234, c -> 345]|
|2  |[x -> 23, a -> 12, y -> 123]  |
+---+------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...