Как узнать дату начала и окончания в PySpark? - PullRequest
0 голосов
/ 24 февраля 2020

Ниже у меня есть Sprame dataframe (articleDF1), я пытаюсь добавить два столбца даты начала и окончания, используя столбец Date, в Dataframe и группировать итоговый кадр данных по post_evar10. Конечный фрейм данных будет иметь post_evar10, дату начала и дату окончания

 -------+--------------------+
|      Date|         post_evar10|
+----------+--------------------+
|2019-09-02|www:/espanol/recu...|
|2019-09-02|www:/caregiving/h...|
|2019-12-15|www:/health/condi...|
|2019-09-01|www:/caregiving/h...|
|2019-08-31|www:/travel/trave...|
|2020-01-20|www:/home-family/...|

Что я пробовал:

from pyspark.sql import functions as f
articleDF3 = articleDF1.withColumn('Start_Date', f.min(f.col('Date'))).withColumn('Start_Date', f.max(f.col('Date'))).groupBy(f.col("post_evar10")).drop("Date")

Ошибка получения: org. apache .spark. sql .AnalysisException: последовательность выражений группировки пуста, а 'temp.ms_article_lifespan_final. Date' не является агрегатной функцией. Оберните '(min (temp.ms_article_lifespan_final. Date) AS Start_Date)' в оконных функциях или оберните 'temp.ms_article_lifespan_final. Date' в first () (или first_value), если вам все равно какое значение вы получите. ;;

1 Ответ

1 голос
/ 24 февраля 2020

Является ли это ожидаемым результатом?

Чтобы получить min, max для каждой строки, мы можем использовать функцию window и получить min,max, затем сгруппировать и объединить, чтобы получить мин, макс. значения!

Example:

import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import *

#Sample data
df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

#window on all rows
w = Window.orderBy("Date").rowsBetween(-sys.maxsize, sys.maxsize)
#or
w = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("min_Date",min("Date").over(w)).\ #get min value for Date
withColumn("max_Date",max("Date").over(w)).\ #get max value for Date
groupBy("post_evar10").\ #groupby on post_evar10
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\ #get min,max
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|   www:/espanol/r|2019-09-02|2019-12-15|
#|www:/caregiving/h|2019-09-02|2019-12-15|
#|www:/health/condi|2019-09-02|2019-12-15|
#+-----------------+----------+----------+

(или)

By using first,last functions over window:

df.withColumn("min_Date",first("Date").over(w)).\
withColumn("max_Date",last("Date").over(w)).\
groupBy("post_evar10").\
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\
show()

Generate min,max for each post_evar10 unique value:

w = Window.partitionBy('post_evar10').orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-09-03','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

df.groupBy("post_evar10").\
agg(min("Date").alias("Start_date"),max("Date").alias("End_date")).\
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|www:/health/condi|2019-12-15|2019-12-15|
#|   www:/espanol/r|2019-09-02|2019-09-02|
#|www:/caregiving/h|2019-09-02|2019-09-03|
#+-----------------+----------+----------+
...