Как я могу использовать выход агрегации в качестве входных данных с withColumn - PullRequest
0 голосов
/ 04 марта 2020

Я пытаюсь профилировать некоторые данные, используя фреймы данных pyspark, которые содержат строки, временные метки, целые числа и числа с плавающей точкой.

toy df:

sdf1 = 
|id1|id2|id3|
+---+---+---+
| 1 |"a"| 4 |
+---+---+---+
| 2 |"a"| 6 |
+---+---+---+
| 1 |"a"| 7 |
+---+---+---+
| 3 |"a"| 9 |
+---+---+---+


sdf2 = 
|ids|
+---+
|id1|
+---+
|id2|
+---+
|id3|
+---+

Я пытаюсь добиться следующего

agg_instructions = [F.max(x).alias("{0}".format(x)) for x in sdf1.columns]

sdf3 = sdf2.withColumn("max", sdf1.agg(*agg_instructions))

Какой приведет к следующему кадру данных. Однако это не работает - любая работа?

sdf3 = 
|ids|max|
+---+---+
|id1| 3 |
+---+---+
|id2|"a"|
+---+---+
|id3| 9 |
+---+---+

Я получаю следующую ошибку:

AssertionError Traceback (последний последний вызов) в () 7 agg_instructions = [F .max (x) .alias ("{0}". format (x)) для x в data_sdf.columns] 8 ----> 9 sdf3 = sdf2.withColumn ("max", sdf1.agg (* agg_instructions) ) 10 11 test = test.reset_index ()

/ databricks / spark / python / pyspark / sql / dataframe.py в withColumn (self, colName, col) 2011 2012 "" "-> 2013 asin isinstance ( col, Column), «col должно быть Column» 2014 возвращение DataFrame (self._jdf.withColumn (colName, col._j c), self.sql_ctx) 2015

AssertionError: col должно быть Column

1 Ответ

2 голосов
/ 04 марта 2020

Это слишком много для того, чего вы хотите достичь. Вы можете получить желаемый результат от sdf1.

Одним из способов является создание столбца массива, который содержит структуры имени столбца и соответствующие им максимальные значения. Затем взорвите его и выберите поля структуры.

Вот пример:

data = [(1, "a", 4), (2, "a", 6), (1, "a", 7), (3, "a", 9)]
df = spark.createDataFrame(data, ["id1", "id2", "id3"])

agg_instructions = array(
        *[struct(lit(c).alias("ids"), max(col(c)).cast("string").alias("max")) for c in df.columns]
    )

df.agg(agg_instructions.alias("agg")) \
  .withColumn("agg", explode(col("agg"))) \
  .select("agg.*") \
  .show()

#+---+---+
#|ids|max|
#+---+---+
#|id1|3  |
#|id2|a  |
#|id3|9  |
#+---+---+
...