Pyspark: изменить столбец в соответствии с условием - PullRequest
0 голосов
/ 05 октября 2019

У меня есть фрейм данных df1:

+-------------------+-----+
|      start_date   |value|
+-------------------+-----+
|2019-03-17 00:00:00|   35|
+-------------------+-----+
|2019-05-20 00:00:00|   40|
+-------------------+-----+
|2019-06-03 00:00:00|   10|
+-------------------+-----+
|2019-07-01 00:00:00|   12|
+-------------------+-----+

и другой фрейм данных df_date:

+-------------------+
|       date        |
+-------------------+
|2019-02-01 00:00:00|
+-------------------+
|2019-04-10 00:00:00|
+-------------------+
|2019-06-14 00:00:00|   
+-------------------+

Я сделал объединение, и теперь у меня есть df с датой, start_date и значениемно значение, которое я хочу, должно быть таким:

+-------------------+-------------------+-----+
|      start_date   |       date        |value|
+-------------------+-------------------+-----+
|2019-02-01 00:00:00|2019-03-17 00:00:00|    0|
+-------------------+-------------------+-----+
|2019-04-10 00:00:00|2019-05-20 00:00:00|   35|
+-------------------+-------------------+-----+
|2019-06-14 00:00:00|2019-06-03 00:00:00|   85|
+-------------------+-------------------+-----+ 

каждый раз, когда я должен сравнивать start_date с датой, если она отличается, я должен добавить предыдущее значение с моим значением, иначе я должен оставить предыдущее значение
У меня уже естьновый фрейм данных с объединением в Pyspark и попыткой получить новое значение

Я использовал этот код для получения результатов

win = Window.partitionBy().orderBy("date")
df = df.withColumn("prev_date", F.lag(F.col("start_date")).over(win))
df = df.fillna({'prev_date': 0})

df = df.withColumn("value",F.when(F.isnull( F.lag(F.col("value"), 1).over(win)),df.value).when(df.start_date != df.prev_date,df.value + F.lag(F.col("value"), 1).over(win)) .otherwise(F.lag(F.col("value"),1).over(win)))
df.show(df.count(),False) 

Проблема в том, что изменения выполняются вв то же время, и мне нужно предыдущее значение каждый раз

Спасибо

1 Ответ

1 голос
/ 06 октября 2019

Вот код, который делает то, что вы хотите.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# step 1: init dataframes

cols = ["start_date", "value"]
data = [["2019-03-17 00:00:00", 35],
["2019-05-20 00:00:00", 40],
        ["2019-06-03 00:00:00", 10],
        ["2019-07-01 00:00:00", 12],
        ]
df = spark.createDataFrame(data, cols)

additional_dates = spark.createDataFrame([["2019-02-01 00:00:00"], ["2019-04-10 00:00:00"], ["2019-06-14 00:00:00"]], ["date"])

# step 2 calculate correct values.
# This is done by joining the df to the additinal dates and summing all values per 'date'
additional_dates = additional_dates.join(df, F.col("date") > F.col("start_date"), "left_outer").fillna(0, subset="value")
additional_dates = additional_dates.groupBy("date").agg(F.sum("value").alias("value"))
# at this point you already have 'date' + the correct value. you only need to join back the original date column

# step 3 get back the original date column
# we do this by joining on the row_number
# note that spark does not have an easy operation for adding a column from another dataframe
window_df = Window.orderBy("start_date")
window_add = Window.orderBy("date")

df = df.withColumn("row_number", F.row_number().over(window_df))
additional_dates = additional_dates.withColumn("row_number", F.row_number().over(window_add))

df = df.drop("value").join(additional_dates, "row_number").drop("row_number")
df.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...