pyspark / dataframe - создание вложенной структуры - PullRequest
0 голосов
/ 03 сентября 2018

Я использую pyspark с фреймом данных и хотел бы создать вложенную структуру, как показано ниже

До:

Column 1 | Column 2 | Column 3 
--------------------------------
A    | B   | 1 
A    | B   | 2 
A    | C   | 1 

После того, как:

Column 1 | Column 4 
--------------------------------
A    | [B : [1,2]] 
A    | [C : [1]]

Это выполнимо?

Ответы [ 2 ]

0 голосов
/ 03 сентября 2018

Я не думаю, что вы можете получить именно такой результат, но вы можете подойти ближе. Проблема заключается в именах ваших ключей для столбца 4. В Spark структурам необходимо иметь фиксированный набор заранее известных столбцов. Но оставим это на потом, во-первых, агрегация:

import pyspark
from pyspark.sql import functions as F

sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

data = [('A', 'B', 1), ('A', 'B', 2), ('A', 'C', 1)]
columns = ['Column1', 'Column2', 'Column3']

data = spark.createDataFrame(data, columns)

data.createOrReplaceTempView("data")
data.show()

# Result
+-------+-------+-------+
|Column1|Column2|Column3|
+-------+-------+-------+
|      A|      B|      1|
|      A|      B|      2|
|      A|      C|      1|
+-------+-------+-------+

nested = spark.sql("SELECT Column1, Column2, STRUCT(COLLECT_LIST(Column3) AS data) AS Column4 FROM data GROUP BY Column1, Column2")
nested.toJSON().collect()

# Result
['{"Column1":"A","Column2":"C","Column4":{"data":[1]}}',
 '{"Column1":"A","Column2":"B","Column4":{"data":[1,2]}}']

Что является почти тем, что вы хотите, верно? Проблема заключается в том, что если вы заранее не знаете названия своих ключей (то есть значений в столбце 2), Spark не сможет определить структуру ваших данных. Кроме того, я не совсем уверен, как вы можете использовать значение столбца в качестве ключа для структуры, если вы не используете UDF (возможно, с PIVOT?):

datatype = 'struct<B:array<bigint>,C:array<bigint>>'  # Add any other potential keys here.
@F.udf(datatype)
def replace_struct_name(column2_value, column4_value):
    return {column2_value: column4_value['data']}

nested.withColumn('Column5', replace_struct_name(F.col("Column2"), F.col("Column4"))).toJSON().collect()

# Output
['{"Column1":"A","Column2":"C","Column4":{"C":[1]}}',
 '{"Column1":"A","Column2":"B","Column4":{"B":[1,2]}}']

Это, конечно, имеет тот недостаток, что количество ключей должно быть дискретным и известным заранее, в противном случае другие значения ключей будут игнорироваться.

0 голосов
/ 03 сентября 2018

Во-первых, воспроизводимый пример вашего фрейма данных.

js = [{"col1": "A", "col2":"B", "col3":1},{"col1": "A", "col2":"B", "col3":2},{"col1": "A", "col2":"C", "col3":1}]
jsrdd = sc.parallelize(js)
sqlContext = SQLContext(sc)
jsdf = sqlContext.read.json(jsrdd)
jsdf.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   B|   1|
|   A|   B|   2|
|   A|   C|   1|
+----+----+----+

Теперь списки не сохраняются в виде пар ключ-значение. Вы можете использовать dictionary или простой collect_list() после выполнения группового запроса в столбце 2.

jsdf.groupby(['col1', 'col2']).agg(F.collect_list('col3')).show()
+----+----+------------------+
|col1|col2|collect_list(col3)|
+----+----+------------------+
|   A|   C|               [1]|
|   A|   B|            [1, 2]|
+----+----+------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...