pyspark: использование функции window () и сравнение строк - PullRequest
1 голос
/ 08 апреля 2020

У меня есть следующие тестовые данные. С этими данными, если нужно запрограммировать в pyspark следующее правило (в действительности данные действительно большие):

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [1, 2, 2, 3, 4, 3], 'productids': ['A;B', 'D;E', 'H;X', 'P;Q;G', 'S;T;U', 'C;G']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

Правило гласит:

"Для custommerid существует более x транзакций с одним идентичным продуктом в корзине в течение y дней. "

В моем примере я вернул go назад x = 2 дня и проверил, по крайней мере, y = 1 совпадающий параметр Customerid. Результат должен выглядеть следующим образом:

date      |customerid|result
2014-01-01|1         |0     
2014-01-02|2         |0         
2014-01-03|2         |0         
2014-01-04|3         |0         
2014-01-05|4         |0         
2014-01-06|3         |1

Для временного окна в 2 дня, только для 2014-01-06 у нас есть случай, когда происходит идентичный customerid (customerid 3 на 2014-01-04) и есть также один соответствующий продукт (G).

Я знаю, что мог бы использовать временное окно, подобное этому:

win = Window().partitionBy('customerid').orderBy((F.col('date')).cast("long")).rangeBetween(
        -(2*86400), Window.currentRow)

К сожалению, в данный момент я не продвигаюсь дальше. Я также абсолютно не знаю, как сравнивать продукты, так как они всегда доступны только в виде длинной строки.

Спасибо!

1 Ответ

1 голос
/ 08 апреля 2020

Это будет работать для spark2.4 + (из-за array_distinct). Пока ваши productids разделены ;, мы можем split в этом разделителе создать список. С window у вас уже есть, мы collect_list, flatten это, а затем посмотрим, сколько дубликатов мы иметь. Количество дубликатов - желаемое result.

from pyspark.sql import functions as F
from pyspark.sql.window import Window    
df=spark.createDataFrame(data)
w=Window().partitionBy("customerid").orderBy(F.col("date").cast("long")).rangeBetween(-86400*2,0)
df.withColumn("productids", F.split("productids", "\;"))\
  .withColumn("products", F.flatten(F.collect_list("productids").over(w)))\
  .withColumn("result", F.size("products") - F.size(F.array_distinct("products")))\
  .orderBy(F.col("date")).drop("productids","products").show()


+-------------------+----------+------+
|               date|customerid|result|
+-------------------+----------+------+
|2014-01-01 00:00:00|         1|     0|
|2014-01-02 00:00:00|         2|     0|
|2014-01-03 00:00:00|         2|     0|
|2014-01-04 00:00:00|         3|     0|
|2014-01-05 00:00:00|         4|     0|
|2014-01-06 00:00:00|         3|     1|
+-------------------+----------+------+

ОБНОВЛЕНИЕ:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("customerid").orderBy(F.col("date").cast("long")).rangeBetween(-86400*2,0)
df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
  .withColumn("products", F.flatten((F.collect_list("productids").over(w))))\
  .withColumn("result", F.when(F.size("products")!=F.size(F.array_distinct("products")),F.lit(1)).otherwise(F.lit(0)))\
  .drop("productids","products").orderBy("date").show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...