Как заменить значение условно в pySpark и использовать замененное значение для следующего условия - PullRequest
0 голосов
/ 23 октября 2018

Перво-наперво, надеюсь, я правильно форматирую свой вопрос.
У меня есть этот фрейм данных:

df = sc.parallelize([
('1112', 1, 0, 1, '2018-05-01'),
('1111', 1, 1, 1, '2018-05-01'),
('1111', 1, 3, 2, '2018-05-04'),
('1111', 1, 1, 2, '2018-05-05'),
('1111', 1, 1, 2, '2018-05-06'),
]).toDF(["customer_id", "buy_count", "date_difference", "expected_answer", "date"]).cache()

df.show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer|      date|
+-----------+---------+---------------+---------------+----------+
|       1111|        1|              1|              1|2018-05-01|
|       1111|        1|              3|              2|2018-05-04|
|       1111|        1|              1|              2|2018-05-05|
|       1111|        1|              1|              2|2018-05-06|
|       1112|        1|              0|              1|2018-05-01|
+-----------+---------+---------------+---------------+----------+

Я хочу создать столбец "Ожидаемый_ответ":

Есликлиент не покупал более 3 дней (date_difference> = 3), я хочу увеличить его buy_count на 1. Каждая покупка после этого должна иметь новый buy_count, если только он не покупает еще 3 дня, в этом случае buy_countснова увеличится.

Вот мой код и как далеко я продвинулся с ним.Кажется, проблема в том, что искра фактически не вменяет значение, а создает новый столбец.Есть ли способ обойти это?Я также пытался с Hive, точно такие же результаты.

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import when

windowSpec = func.lag(df['buy_count']).\
over(Window.partitionBy(df['customer_id']).\
orderBy(df['date'].asc()))

df.withColumn('buy_count', \
              when(df['date_difference'] >=3, windowSpec +1).when(windowSpec.isNull(), 1)\
              .otherwise(windowSpec)).show()

+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer|      date|
+-----------+---------+---------------+---------------+----------+
|       1112|        1|              0|              1|2018-05-01|
|       1111|        1|              1|              1|2018-05-01|
|       1111|        2|              3|              2|2018-05-04|
|       1111|        1|              1|              2|2018-05-05|
|       1111|        1|              1|              2|2018-05-06|
+-----------+---------+---------------+---------------+----------+

Как я могу получить ожидаемый результат?Заранее спасибо.

1 Ответ

0 голосов
/ 23 октября 2018

Разобрался наконец.Спасибо всем за указание на подобные случаи.

У меня сложилось впечатление, что SUM () по разделу суммирует по всему разделу, а не просто суммирует все до текущей строки.К счастью, мне удалось решить мою проблему с помощью очень простого SQL:

SELECT SUM(CASE WHEN(date_difference>=3) THEN 1 ELSE 0 END) OVER (PARTITION BY customer_id ORDER BY date) 
       FROM df

sqlContext.sql(qry).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...