pyspark создает словарные данные из pyspark sql dataframe - PullRequest
0 голосов
/ 02 июня 2018

Иметь pyspark.sql.dataframe.DataFrame со следующей структурой, и это продолжается для всех месяцев во всех странах, указанных ниже:

+----------+-------+------------------+
|DATE      |COUNTRY|AVG_TEMPS         |
+----------+-------+------------------+
|2007-01-01|Åland  |0.5939999999999999|
|2007-02-01|Åland  |-4.042            |
|2007-03-01|Åland  |2.443             |
|2007-04-01|Åland  |4.621             |
|2007-05-01|Åland  |8.411             |
|2007-06-01|Åland  |13.722999999999999|
|2007-07-01|Åland  |15.749            |
+----------+-------+------------------+

Ожидаемый результат - словарь Python, подобный приведенной ниже ссылке:

pyspark - создать столбцы DataFrame Grouping в структуре типа карты

-----------------------------------------
|    DATE  |        COUNTRY_TEMP        | 
-----------------------------------------
|2007-01-01|{Åland: 0.593, Alfredo:2.44}|
|2007-01-02| {Åland: 0.57, Alfredo:2.14}|
-----------------------------------------

Когда я пытаюсь следовать этому, я получаю некоторую ошибку

df_converted = newres.groupBy('DATE').\
    agg(collect_list(create_map(col("COUNTRY"))))

Ошибка:

AnalysisException: u"cannot resolve 'map(`COUNTRY`)' due to data type mismatch: map expects a positive even number of arguments.
;;\n'Aggregate [DATE#179], [DATE#179, collect_list(map(COUNTRY#180), 0, 0) AS collect_list(map(COUNTRY))#189]\n+- Project [DATE#146 AS DATE#179,
COUNTRY#85 AS COUNTRY#180, AVG_TEMPS#147 AS AVG_TEMPS#181]\n   +- Project [dt#82 AS DATE#146, COUNTRY#85, AverageTemperature#83 AS AVG_TEMPS#147]
\n      +- SubqueryAlias global_temps_by_cntry\n         +- Relation[dt#82,AverageTemperature#83,AverageTemperatureUncertainty#84,Country#85] csv\n"

Может кто-нибудь помочь pls ???

1 Ответ

0 голосов
/ 02 июня 2018

как упомянуто @ user3689574, попробуйте добавить значение в create_map:

df = spark.createDataFrame([('2007-01-01', 'Aland', 0.593), ('2007-01-01', 'Alfredo', 2.44),('2007-01-02', 'Aland', 2.57), ('2007-01-02', 'Alfredo', 2.14)], ['DATE', 'COUNTRY', 'AVG_TEMPS'])

df.show()

+----------+-------+---------+ 
| DATE     |COUNTRY|AVG_TEMPS|
+----------+-------+---------+ 
|2007-01-01|  Aland|    0.593|
|2007-01-01|Alfredo|     2.44| 
|2007-01-02|  Aland|     2.57| 
|2007-01-02|Alfredo|     2.14|
+----------+-------+---------+

from pyspark.sql.functions import collect_list, col, create_map
df2 = df.groupBy("DATE").agg(collect_list( create_map( func.col("COUNTRY"), col("AVG_TEMPS") ) ).alias("COUNTRY_TEMP"))

df2.show(4, False)

+----------+-------------------------------------+
|DATE      |COUNTRY_TEMP                         |
+----------+-------------------------------------+
|2007-01-01|[[Aland -> 0.593], [Alfredo -> 2.44]]| 
|2007-01-02|[[Aland -> 2.57], [Alfredo -> 2.14]] |
+----------+-------------------------------------+ 
...