Заполнение новых столбцов на основе условий в pyspark - PullRequest
0 голосов
/ 22 сентября 2019

У меня есть фрейм данных в pyspark, как показано ниже.

df = sqlContext.createDataFrame(
[
("101", "99.10", "2019-06-04"), 
("102", "89.27", "2019-06-04"), 
("102", "89.10", "2019-03-04"), 
("103", "73.11", "2019-09-10"), 
("101", "-69.81", "2019-09-11"), 
("101", "12.51", "2018-12-14"), 
("101", "43.23", "2018-09-11")
], 
("user_id", "amount", "trans_date"))

Я хочу выяснить следующие вещи

1) Сначала найдите max of trans_date в фрейме данныхи хранить как переменную.Я получил это, делая так, как показано ниже

from pyspark.sql import functions as f
from pyspark.sql import Window
max_date=df.groupby().agg(f.max('trans_date')).collect()[0].asDict()['max(trans_date)']

2) Сумма amount для каждого user_id

df1=df.withColumn('Balance', f.sum('amount').over(Window.partitionBy('user_id')))

3) Теперь, используя это max_date Я хочу создать несколькостолбцы и заполнить значения, например

`days_0_30` if `trans_date` is between `max_date` and `30 days before
`days_31_60` if `trans_date` is between `max_date - 31 days` and `60 days before
and so on. I am able to do it using below

df2 = df1.withColumn(days_0_30, f.when((df1.trans_date <= '2019-09-11') & (df1.trans_date >= '2019-06-11'), df1.Balance).otherwise('null')).withColumn(days_31_60, f.when((df1.trans_date <= '2019-06-10') & (df1.trans_date >= '2019-03-11'), df1.Balance).otherwise('null')).withColumn(days_61_90, f.when((df1.trans_date <= '2019-03-10') & (df1.trans_date >= '2018-12-11'), df1.Balance).otherwise('null')).withColumn(days_91_120, f.when((df1.trans_date <= '2018-12-10') & (df1.trans_date >= '2018-09-11'), df1.Balance).otherwise('null')).withColumn(days_121_150, f.when((df1.trans_date <= '2018-09-10') & (df1.trans_date >= '2018-06-11'), df1.Balance).otherwise('null'))

Если date is not in range defined значения должны быть null

Но у меня есть жестко закодированные даты в приведенном выше.Я хотел бы, чтобы это динамически достигало моего результата.

Что я должен сделать, чтобы достичь своего результата более элегантным способом

1 Ответ

2 голосов
/ 22 сентября 2019

Просто используйте списки:

Следуйте инструкциям, чтобы получить df1 (убедитесь, что в столбце trans_date указан DateType ())

from pyspark.sql import functions as f, Window
from datetime import timedelta

df = df.withColumn('trans_date', f.to_date('trans_date'))

max_date = df.select(f.max('trans_date').alias('max_date')).first().max_date
# datetime.date(2019, 9, 11)

df1 = df.withColumn('balance', f.round(f.sum('amount').over(Window.partitionBy('user_id')),2))

Затем используйте списки для создания спискакортежа с тремя элементами (range_name, range_start_date, range_end_date)

dranges = [
    ('days_{}_{}'.format(i*30+1,(i+1)*30), max_date-timedelta(days=(i+1)*30), max_date-timedelta(days=i*30+1))
         for i in range(5)
]
dranges 
#[('days_1_30', datetime.date(2019, 8, 12), datetime.date(2019, 9, 10)),
# ('days_31_60', datetime.date(2019, 7, 13), datetime.date(2019, 8, 11)),
# ('days_61_90', datetime.date(2019, 6, 13), datetime.date(2019, 7, 12)),
# ('days_91_120', datetime.date(2019, 5, 14), datetime.date(2019, 6, 12)),
# ('days_121_150', datetime.date(2019, 4, 14), datetime.date(2019, 5, 13))]

"""will need to adjust the first element since it does not follow the same rules as other ranges:"""
dranges[0] = ('days_0_30', dranges[0][1], max_date)

Теперь используйте понимание списка для генерации этих новых столбцов:

df2 = df1.select('*', *[
        f.when((df1.trans_date >= d[1]) & (df1.trans_date <= d[2]), df1.balance).otherwise(None).alias(d[0])
            for d in dranges
       ])

df2.show()
+-------+------+----------+-------+---------+----------+----------+-----------+------------+
|user_id|amount|trans_date|balance|days_0_30|days_31_60|days_61_90|days_91_120|days_121_150|
+-------+------+----------+-------+---------+----------+----------+-----------+------------+
|    101| 99.10|2019-06-04|  85.03|     null|      null|      null|      85.03|        null|
|    101|-69.81|2019-09-11|  85.03|    85.03|      null|      null|       null|        null|
|    101| 12.51|2018-12-14|  85.03|     null|      null|      null|       null|        null|
|    101| 43.23|2018-09-11|  85.03|     null|      null|      null|       null|        null|
|    102| 89.27|2019-06-04| 178.37|     null|      null|      null|     178.37|        null|
|    102| 89.10|2019-03-04| 178.37|     null|      null|      null|       null|        null|
|    103| 73.11|2019-09-10|  73.11|    73.11|      null|      null|       null|        null|
+-------+------+----------+-------+---------+----------+----------+-----------+------------+

Примечание: выглядитвы используете 90-дневные интервалы вместо 30-дневных в вашем коде.но я полагаю, что вам легко настроить приведенный выше код в соответствии с вашими потребностями.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...