Pyspark: пересечение нескольких массивов - PullRequest
0 голосов
/ 12 апреля 2020

У меня есть следующие тестовые данные, и я должен проверить следующее утверждение с помощью pyspark (данные на самом деле очень велики: 700000 транзакций, каждая транзакция с более чем 10 продуктами):

import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [1, 2, 2, 3, 4, 3], 'productids': ['A;B', 'D;E', 'H;X', 'P;Q;G', 'S;T;U', 'C;G']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

"The транзакции, которые существуют для идентификатора клиента в течение x дней, характеризуются по крайней мере одним идентичным продуктом в корзине. "

Пока у меня есть следующий подход (пример x = 2):

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")

df=spark.createDataFrame(data)

x = 2

win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
    .withColumn("flat_col", F.array_distinct(F.flatten((F.collect_list("productids").over(win))))).orderBy(F.col("date"))

test = test.toPandas()

enter image description here

Таким образом, по каждой транзакции мы смотрим на 2 дня в прошлом, группируем по обычному количеству и соответствующие продукты суммируются в столбце "flat_col".

Но на самом деле мне нужно пересечение корзин с одинаковым идентификатором. Только тогда я могу судить, есть ли общие продукты.

Так что вместо ['P', 'Q', 'G', 'C'] в пятом ряду "flat_col" должно быть ['Г ']. Точно так же [] должен появиться во всех других строках "flat_col".

Большое вам спасибо!

Ответы [ 2 ]

1 голос
/ 12 апреля 2020

Этого можно достичь без self-join (, поскольку объединения дорогие shuffle операций в больших данных ), используя higher order functions в spark 2.4. Используемые функции filter,transform,aggregate.

df=spark.createDataFrame(data)

x = 2

win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)
test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\
    .withColumn("flat_col", F.flatten(F.collect_list("productids").over(win)))\
    .withColumn("occurances", F.expr("""filter(transform(productids, x->\
     IF(aggregate(flat_col, 0,(acc,t)->acc+IF(t=x,1,0))>1,x,null)),y->y!='null')"""))\
    .drop("flat_col").orderBy("date").show()

+-------------------+----------+----------+----------+
|               date|customerid|productids|occurances|
+-------------------+----------+----------+----------+
|2014-01-01 00:00:00|         1|    [A, B]|        []|
|2014-01-02 00:00:00|         2|    [D, E]|        []|
|2014-01-03 00:00:00|         2|    [H, X]|        []|
|2014-01-04 00:00:00|         3| [P, Q, G]|        []|
|2014-01-05 00:00:00|         4| [S, T, U]|        []|
|2014-01-06 00:00:00|         3|    [C, G]|       [G]|
+-------------------+----------+----------+----------+
0 голосов
/ 12 апреля 2020

Самостоятельное соединение - лучший трюк за всю историю

from pyspark.sql.functions import concat_ws, collect_list
spark.createDataFrame(data).registerTempTable("df")
sql("SELECT date, customerid, explode(split(productids, ';')) productid FROM df").registerTempTable("altered")
df = sql("SELECT al.date, al.customerid, al.productid productids, altr.productid flat_col FROM altered al left join altered altr on altr.customerid = al.customerid and al.productid = altr.productid and al.date != altr.date and datediff(al.date,altr.date) <=2 and datediff(al.date,altr.date) >=-2")
df.groupBy("date", "customerid").agg(concat_ws(",", collect_list("productids")).alias('productids'), concat_ws(",", collect_list("flat_col")).alias('flat_col')).show()

spark output

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...