Добавить новый столбец с максимальным значением другого столбца в фреймворке pyspark - PullRequest
0 голосов
/ 12 июля 2020

Нужна помощь по pyspark df. Я пытаюсь добавить новый столбец с максимальным значением другого столбца в существующий фрейм данных, но получаю ошибку ниже. Вот что я делаю.

df1 = df.withColumn('WEEK_START_DATE', df.agg(f.max('DATE')))



error:
AttributeError: 'DataFrame' object has no attribute '_get_object_id'

1 Ответ

2 голосов
/ 12 июля 2020

Я не думаю, что мы можем использовать агрегатные функции в withColumn, но вот обходной путь для этого случая.

1.Using crossJoin:

from pyspark.sql.functions import *
df.show()    
#+---+----+
#| id|name|
#+---+----+
#|  1|   a|
#|  2|   b|
#|  3|   c|
#+---+----+
df1=df.agg(max('id'))
spark.sql("set spark.sql.crossJoin.enabled=true")
#cross join
df.join(df1)
#or
df.crossJoin(df1).show()
+---+----+-------+
#| id|name|max(id)|
#+---+----+-------+
#|  1|   a|      3|
#|  2|   b|      3|
#|  3|   c|      3|
#+---+----+-------+

2. Using Window function:

from pyspark.sql import *
import sys
w=Window.orderBy(monotonically_increasing_id()).rowsBetween(-sys.maxsize,sys.maxsize)
df.withColumn("max",max(col("id")).over(w)).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#|  1|   a|  3|
#|  2|   b|  3|
#|  3|   c|  3|
#+---+----+---+

3. Using variable substitution:

max_value=df.agg(max("id")).collect()[0][0]

df.withColumn("max",lit(max_value)).show()

#or
max_value=lit(df.agg(max("id")).collect()[0][0])
type(max_value)
#<class 'pyspark.sql.column.Column'>
df.withColumn("max",max_value).show()
#+---+----+---+
#| id|name|max|
#+---+----+---+
#|  1|   a|  3|
#|  2|   b|  3|
#|  3|   c|  3|
#+---+----+---+

Using Spark-sql:

df.createOrReplaceTempView("tmp")
spark.sql("select * from tmp cross join (select max(id) max_val from tmp) t1").show()

spark.sql("select *,max(id) over(order by id rows between unbounded preceding and unbounded following) as max_val from tmp").show()

max_value=df.agg(max(col("id"))).collect()[0][0]
spark.sql("select *,{0} as max_val from tmp".format(max_value)).show()
#+---+----+-------+
#| id|name|max_val|
#+---+----+-------+
#|  1|   a|      3|
#|  2|   b|      3|
#|  3|   c|      3|
#+---+----+-------+
...