Как добавить два столбца с разными строками со специальным условием? - PullRequest
0 голосов
/ 19 февраля 2019

Привет, у меня есть PySpark dataframe .Итак, я хотел бы добавить два столбца из разных строк со специальным условием.Один из столбцов является типом даты.

Вот пример данных:

--------------------------------
| flag|      date     |  diff  |
--------------------------------
| 1   |   2014-05-31  | 0      |
--------------------------------
| 2   |   2014-06-02  | 2      |
--------------------------------
| 3   |   2016-01-14  | 591    |
--------------------------------
| 1   |   2016-07-08  | 0      |
--------------------------------
| 2   |   2016-07-12  | 4      |
--------------------------------

В настоящее время я только знаю, как добавить два столбца, используя этот код:

from pyspark.sql.functions import expr
dataframe.withColumn("new_column", expr("date_add(date_column, int_column)"))

Ожидаемый результат :

Существует новый столбец с именем «new_date», который является результатом добавления столбца «diff» в «столбец даты».

Подвох в том, что есть специальное условие: если «flag» равен 1, «date» и «diff» происходят из одной строки, , если нет, «date» происходит из предыдущей строки .

Я знаю, что в этом случае мои данные должны быть правильно отсортированы.

Если кто-нибудь может мне помочь, я был бы очень благодарен.Спасибо.

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

На всякий случай у вас такая же проблема с ответом Ксавье.Идея та же, но я удалил некоторые ненужные условия для окна и исправил синтаксическую ошибку, а также ошибку date_add, с которой я столкнулся при попытке попробовать его версию.

from pyspark.sql.functions import *
df1 = spark.createDataFrame([(1,datetime.date(2014,5,31),0),(2,datetime.date(2014,6,2),2),(3,datetime.date(2016,1,14),591),(1,datetime.date(2016,7,8),0),(2,datetime.date(2016,7,12),4)], ["flag","date","diff"])

w = Window.orderBy(col("date"))
df1 = df1.withColumn('previous_date', lag('date', 1).over(w))
df1 = df1.withColumn('new_date',when(col('flag')==1,\
expr('date_add(date, diff)'))\
.otherwise(expr('date_add(previous_date,diff)'))).drop('previous_date')
df1.show()

Вывод:

+----+----------+----+----------+
|flag|      date|diff|  new_date|
+----+----------+----+----------+
|   1|2014-05-31|   0|2014-05-31|
|   2|2014-06-02|   2|2014-06-02|
|   3|2016-01-14| 591|2016-01-14|
|   1|2016-07-08|   0|2016-07-08|
|   2|2016-07-12|   4|2016-07-12|
+----+----------+----+----------+
0 голосов
/ 19 февраля 2019

Вам просто нужно создать столбец с предыдущей датой, используя Window, и создать новый столбец в зависимости от значения 'flag'

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window().partitionBy().orderBy(F.col('date'))

dataframe = dataframe.withColumn('previous_date', F.lag('date', 1).over(w))

dataframe = dataframe.withColumn('new_date',
                                 F.when(F.col('flag')==1,
                                        F.expr("date_add(previous_date, diff)")
                                        ).otherwise(F.expr("date_add(date, diff)"))
                                ).drop('previous_date')
...