Как создать последовательность событий (значений столбцов) для какого-либо другого столбца? - PullRequest
0 голосов
/ 28 апреля 2018

У меня есть фрейм данных Spark, как показано ниже -

val myDF = Seq(
(1,"A",100,0,0),
(1,"E",200,0,0),
(1,"",300,1,49),
(2,"A",200,0,0),
(2,"C",300,0,0),
(2,"D",100,0,0)
).toDF("visitor","channel","timestamp","purchase_flag","amount")


scala> myDF.show
+-------+-------+---------+-------------+------+
|visitor|channel|timestamp|purchase_flag|amount|
+-------+-------+---------+-------------+------+
|      1|      A|      100|            0|     0|
|      1|      E|      200|            0|     0|
|      1|       |      300|            1|    49|
|      2|      A|      200|            0|     0|
|      2|      C|      300|            0|     0|
|      2|      D|      100|            0|     0|
+-------+-------+---------+-------------+------+

Я хотел бы создать фрейм данных Sequence для каждого посетителя из myDF, который отслеживает путь посетителя к покупке, заказанной по измерению timestamp. Выходной кадр данных должен выглядеть следующим образом (-> может быть любым разделителем) -

+-------+---------------------+
|visitor|channel sequence     |
+-------+---------------------+
|      1| A->E->purchase      |
|      2| D->A->C->no_purchase|
+-------+---------------------+

Чтобы прояснить ситуацию, посетитель 2 был открыт для канала D, затем A, а затем C; и он не делает покупки. Следовательно, последовательность должна быть сформирована как D->A-C->no_purchase.

ПРИМЕЧАНИЕ: Всякий раз, когда происходит покупка, значение канала меняется на blank, а purchase_flag устанавливается на 1.

Я хочу сделать это, используя Scala UDF в Spark, чтобы я снова применил метод к другим наборам данных.

1 Ответ

0 голосов
/ 28 апреля 2018

Вот как это делается с помощью функции udf

val myDF = Seq(
  (1,"A",100,0,0),
  (1,"E",200,0,0),
  (1,"",300,1,49),
  (2,"A",200,0,0),
  (2,"C",300,0,0),
  (2,"D",100,0,0)
).toDF("visitor","channel","timestamp","purchase_flag","amount")

import org.apache.spark.sql.functions._
def sequenceUdf = udf((struct: Seq[Row], purchased: Seq[Int])=> struct.map(row => (row.getAs[String]("channel"), row.getAs[Int]("timestamp"))).sortBy(_._2).map(_._1).filterNot(_ == "").mkString("->")+{if(purchased.contains(1)) "->purchase" else "->no_purchase"})

myDF.groupBy("visitor").agg(collect_list(struct("channel", "timestamp")).as("struct"), collect_list("purchase_flag").as("purchased"))
  .select(col("visitor"), sequenceUdf(col("struct"), col("purchased")).as("channel sequence"))
  .show(false)

, который должен дать вам

+-------+--------------------+
|visitor|channel sequence    |
+-------+--------------------+
|1      |A->E->purchase      |
|2      |D->A->C->no_purchase|
+-------+--------------------+

Вы можете сделать его настолько общим, насколько сможете. это просто демонстрация того, как вы должны действовать

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...