Определение количества совместных сеансов на пару продуктов - PullRequest
1 голос
/ 22 мая 2019

У меня есть этот фрейм данных:

from pyspark.mllib.linalg.distributed import IndexedRow

rows = sc.parallelize([[1, "A"], [1, 'B'] , [1, "A"], [2, 'A'], [2, 'C'] ,[3,'A'], [3, 'B']])

rows_df = rows.toDF(["session_id", "product"])

rows_df.show()

+----------+-------+
|session_id|product|
+----------+-------+
|         1|      A|
|         1|      B|
|         1|      A|
|         2|      A|
|         2|      C|
|         3|      A|
|         3|      B|
+----------+-------+

Я хочу знать, сколько совместных сеансов проходит в каждой паре продуктов.Одни и те же продукты могут присутствовать в сеансе несколько раз, но я хочу, чтобы по одному сеансу для каждой пары продуктов было только одно количество.

Пример вывода:

+---------+---------+-----------------+
|product_a|product_b|num_join_sessions|
+---------+---------+-----------------+
|        A|        B|                2|
|        A|        C|                1|
|        B|        A|                2|
|        B|        C|                0|
|        C|        A|                1|
|        C|        B|                0|
+---------+---------+-----------------+

Я заблудился, как реализовать это в pyspark.

Ответы [ 2 ]

1 голос
/ 22 мая 2019

Получение общего количества сеансов для пар, имеющих совместные сеансы, довольно просто. Вы можете достичь этого, присоединив DataFrame к себе на session_id и отфильтровав строки, где product s одинаковы.

Затем вы группируете по парам product и подсчитываете различные session_id с.

import pyspark.sql.functions as f

rows_df.alias("l").join(rows_df.alias("r"), on="session_id", how="inner")\
    .where("l.product != r.product")\
    .groupBy(f.col("l.product").alias("product_a"), f.col("r.product").alias("product_b"))\
    .agg(f.countDistinct("session_id").alias("num_join_sessions"))\
    .show()
#+---------+---------+-----------------+
#|product_a|product_b|num_join_sessions|
#+---------+---------+-----------------+
#|        A|        C|                1|
#|        C|        A|                1|
#|        B|        A|                2|
#|        A|        B|                2|
#+---------+---------+-----------------+

(Примечание: если вы хотите использовать ТОЛЬКО уникальные пары продуктов, измените значение != на < в функции where).

Сложность в том, что вам также нужны пары, у которых нет совместных сеансов. Это может быть выполнено, но оно не будет эффективным, потому что вам нужно будет получать декартово произведение каждой пары продуктов.

Тем не менее, вот один подход:

Начните с вышеизложенного и ПРАВИЛЬНО объедините декартово произведение пар различных товаров.

rows_df.alias("l").join(rows_df.alias("r"), on="session_id", how="inner")\
    .where("l.product != r.product")\
    .groupBy(f.col("l.product").alias("product_a"), f.col("r.product").alias("product_b"))\
    .agg(f.countDistinct("session_id").alias("num_join_sessions"))\
    .join(
        rows_df.selectExpr("product AS product_a").distinct().crossJoin(
            rows_df.selectExpr("product AS product_b").distinct()
        ).where("product_a != product_b").alias("pairs"),
        on=["product_a", "product_b"],
        how="right"
    )\
    .fillna(0)\
    .sort("product_a", "product_b")\
    .show()
#+---------+---------+-----------------+
#|product_a|product_b|num_join_sessions|
#+---------+---------+-----------------+
#|        A|        B|                2|
#|        A|        C|                1|
#|        B|        A|                2|
#|        B|        C|                0|
#|        C|        A|                1|
#|        C|        B|                0|
#+---------+---------+-----------------+

Примечание: sort не требуется, но я включил его, чтобы соответствовать порядку желаемого выхода.

0 голосов
/ 22 мая 2019

Я считаю, что это должно сделать это:

import pyspark.sql.functions as F

joint_sessions = rows_df.withColumnRenamed(
    'product', 'product_a'
).join(
    rows_df.withColumnRenamed('product', 'product_b'),
    on='session_id',
    how='inner'
).filter(
    F.col('product_a') != F.col('product_b')
).groupBy(
    'product_a',
    'product_b'
).agg(
    F.countDistinct('session_id').alias('num_join_sessions')
).select(
    'product_a',
    'product_b',
    'num_join_sessions'
)

joint_sessions.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...