У меня есть следующие тестовые данные, и я должен проверить следующее утверждение с помощью pyspark (данные на самом деле очень велики: 700000 транзакций, каждая транзакция с более чем 10 продуктами):
import pandas as pd
import datetime
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
'customerid': [1, 2, 2, 3, 4, 3], 'productids': ['A;B', 'D;E', 'H;X', 'P;Q;G', 'S;T;U', 'C;G']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
"The транзакции, которые существуют для идентификатора клиента в течение x дней, характеризуются по крайней мере одним идентичным продуктом в корзине. "
Пока у меня есть следующий подход (пример x = 2):
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")
df=spark.createDataFrame(data)
x = 2
win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
.withColumn("flat_col", F.array_distinct(F.flatten((F.collect_list("productids").over(win))))).orderBy(F.col("date"))
test = test.toPandas()
Таким образом, по каждой транзакции мы смотрим на 2 дня в прошлом, группируем по обычному количеству и соответствующие продукты суммируются в столбце "flat_col".
Но на самом деле мне нужно пересечение корзин с одинаковым идентификатором. Только тогда я могу судить, есть ли общие продукты.
Так что вместо ['P', 'Q', 'G', 'C'] в пятом ряду "flat_col" должно быть ['Г ']. Точно так же [] должен появиться во всех других строках "flat_col".
Большое вам спасибо!