У меня есть фрейм данных, который выглядит так:
>>> l = [('a', 'foo', 1), ('b', 'bar', 1), ('a', 'biz', 6), ('c', 'bar', 3), ('c', 'biz', 2)]
>>> df = spark.createDataFrame(l, ('uid', 'code', 'level'))
>>> df.show()
+---+----+-----+
|uid|code|level|
+---+----+-----+
| a| foo| 1|
| b| bar| 1|
| a| biz| 6|
| c| bar| 3|
| c| biz| 2|
+---+----+-----+
Я пытаюсь сгруппировать значения code
и level
в list
из dict
ивыведите этот список в виде строки JSON, чтобы я мог сохранить фрейм данных на диск.Результат будет выглядеть примерно так:
>>> df.show()
+---+--------------------------+
|uid| json |
+---+--------------------------+
| a| '[{"foo":1}, {"biz":6}]' |
| b| '[{"bar":1}]' |
| c| '[{"bar":3}, {"biz":2}]' |
+---+--------------------------+
Я все еще довольно новичок в использовании PySpark, и у меня много проблем с выяснением, как получить этот результат.Я почти наверняка нуждаюсь в groupBy
, и я попытался реализовать это, создав новый столбец StringType
с именем "json", а затем с помощью декоратора pandas_udf
, но я получаю ошибки о недоступных типах, потому что, как яВыяснилось, что способ доступа к данным - это доступ ко всему столбцу, а не только к строке.
>>> df = df.withColumn('json', F.list(''))
>>> schema = df.schema
>>> @pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
..: def to_json(pdf):
..: return pdf.assign(serial=json.dumps({pdf.code:pdf.level}))
Я рассмотрел использование объединения строк между двумя столбцами и использование collect_set
, но эточувствует себя неправильно, так как у него есть потенциал для записи на диск того, что не может быть загружено JSON только потому, что оно имеет строковое представление.Любая помощь приветствуется.