Создание строки JSON из двух столбцов в PySpark GroupBy - PullRequest
0 голосов
/ 18 февраля 2019

У меня есть фрейм данных, который выглядит так:

>>> l = [('a', 'foo', 1), ('b', 'bar', 1), ('a', 'biz', 6), ('c', 'bar', 3), ('c', 'biz', 2)]
>>> df = spark.createDataFrame(l, ('uid', 'code', 'level')) 
>>> df.show()
+---+----+-----+
|uid|code|level|
+---+----+-----+
|  a| foo|    1|
|  b| bar|    1|
|  a| biz|    6|
|  c| bar|    3|
|  c| biz|    2|
+---+----+-----+

Я пытаюсь сгруппировать значения code и level в list из dict ивыведите этот список в виде строки JSON, чтобы я мог сохранить фрейм данных на диск.Результат будет выглядеть примерно так:

>>> df.show()
+---+--------------------------+
|uid|           json           |
+---+--------------------------+
|  a| '[{"foo":1}, {"biz":6}]' |
|  b| '[{"bar":1}]'            |
|  c| '[{"bar":3}, {"biz":2}]' |
+---+--------------------------+

Я все еще довольно новичок в использовании PySpark, и у меня много проблем с выяснением, как получить этот результат.Я почти наверняка нуждаюсь в groupBy, и я попытался реализовать это, создав новый столбец StringType с именем "json", а затем с помощью декоратора pandas_udf, но я получаю ошибки о недоступных типах, потому что, как яВыяснилось, что способ доступа к данным - это доступ ко всему столбцу, а не только к строке.

>>> df = df.withColumn('json', F.list(''))
>>> schema = df.schema
>>> @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
..: def to_json(pdf):
..:     return pdf.assign(serial=json.dumps({pdf.code:pdf.level}))

Я рассмотрел использование объединения строк между двумя столбцами и использование collect_set, но эточувствует себя неправильно, так как у него есть потенциал для записи на диск того, что не может быть загружено JSON только потому, что оно имеет строковое представление.Любая помощь приветствуется.

1 Ответ

0 голосов
/ 18 февраля 2019

Нет необходимости в pandas_udf в этом случае.to_json, collect_list и create_map должны быть все, что вам нужно:

import pyspark.sql.functions as f

df.groupby('uid').agg(
  f.to_json(
    f.collect_list(
      f.create_map('code', 'level')
    )
  ).alias('json')
).show(3, False)
+---+---------------------+
|uid|json                 |
+---+---------------------+
|c  |[{"bar":3},{"biz":2}]|
|b  |[{"bar":1}]          |
|a  |[{"foo":1},{"biz":6}]|
+---+---------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...