Перво-наперво, надеюсь, я правильно форматирую свой вопрос.
У меня есть этот фрейм данных:
df = sc.parallelize([
('1112', 1, 0, 1, '2018-05-01'),
('1111', 1, 1, 1, '2018-05-01'),
('1111', 1, 3, 2, '2018-05-04'),
('1111', 1, 1, 2, '2018-05-05'),
('1111', 1, 1, 2, '2018-05-06'),
]).toDF(["customer_id", "buy_count", "date_difference", "expected_answer", "date"]).cache()
df.show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer| date|
+-----------+---------+---------------+---------------+----------+
| 1111| 1| 1| 1|2018-05-01|
| 1111| 1| 3| 2|2018-05-04|
| 1111| 1| 1| 2|2018-05-05|
| 1111| 1| 1| 2|2018-05-06|
| 1112| 1| 0| 1|2018-05-01|
+-----------+---------+---------------+---------------+----------+
Я хочу создать столбец "Ожидаемый_ответ":
Есликлиент не покупал более 3 дней (date_difference> = 3), я хочу увеличить его buy_count на 1. Каждая покупка после этого должна иметь новый buy_count, если только он не покупает еще 3 дня, в этом случае buy_countснова увеличится.
Вот мой код и как далеко я продвинулся с ним.Кажется, проблема в том, что искра фактически не вменяет значение, а создает новый столбец.Есть ли способ обойти это?Я также пытался с Hive, точно такие же результаты.
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import when
windowSpec = func.lag(df['buy_count']).\
over(Window.partitionBy(df['customer_id']).\
orderBy(df['date'].asc()))
df.withColumn('buy_count', \
when(df['date_difference'] >=3, windowSpec +1).when(windowSpec.isNull(), 1)\
.otherwise(windowSpec)).show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer| date|
+-----------+---------+---------------+---------------+----------+
| 1112| 1| 0| 1|2018-05-01|
| 1111| 1| 1| 1|2018-05-01|
| 1111| 2| 3| 2|2018-05-04|
| 1111| 1| 1| 2|2018-05-05|
| 1111| 1| 1| 2|2018-05-06|
+-----------+---------+---------------+---------------+----------+
Как я могу получить ожидаемый результат?Заранее спасибо.