Spark сессионизация с использованием фреймов данных - PullRequest
0 голосов
/ 04 февраля 2019

Я хочу выполнить сессионную обработку потока кликов для фрейма данных spark.Давайте загрузим фрейм данных, в котором есть события из нескольких сессий, со следующей схемой - enter image description here

И я хочу агрегировать (сшивать) сеансы, как это - enter image description here

Я изучил функции UDAF и Window, но не мог понять, как их использовать для этого конкретного варианта использования.Я знаю, что разбиение данных по идентификатору сеанса помещает все данные сеанса в один раздел, но как их объединить?

Идея состоит в том, чтобы объединить все события, относящиеся к каждому сеансу, в виде одной выходной записи.

1 Ответ

0 голосов
/ 04 февраля 2019

Вы можете использовать collect_set:

 def process(implicit spark: SparkSession) = {
      import spark._

      import org.apache.spark.sql.functions.{ concat, col, collect_set }

      val seq = Seq(Row(1, 1, "startTime=1549270909"), Row(1, 1, "endTime=1549270913"))

      val rdd = spark.sparkContext.parallelize(seq)

      val df1 = spark.createDataFrame(rdd, StructType(List(StructField("sessionId", IntegerType, false), StructField("userId", IntegerType, false), StructField("session", StringType, false))))

      df1.groupBy("sessionId").agg(collect_set("session"))
    }
  }

Это дает вам:

+---------+------------------------------------------+
|sessionId|collect_set(session)                      |
+---------+------------------------------------------+
|1        |[startTime=1549270909, endTime=1549270913]|
+---------+------------------------------------------+

в качестве вывода.

Если вам нужна более сложная логика, это может бытьвключается в следующий UDAF:

  class YourComplexLogicStrings extends UserDefinedAggregateFunction {
    override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

    override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)

    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val b = buffer.getAs[String](0)
      val i = input.getAs[String](0)
      buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
    }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      val b1 = buffer1.getAs[String](0)
      val b2 = buffer2.getAs[String](0)
      if(!b1.isEmpty)
        buffer1(0) = (b1) ++ "," ++ (b2)
      else
        buffer1(0) = b2
    }

    override def evaluate(buffer: Row): Any = {
      val yourString = buffer.getAs[String](0)
      // Compute your logic and return another String
      yourString
    }
  }



def process0(implicit spark: SparkSession) = {

  import org.apache.spark.sql.functions.{ concat, col, collect_set }


  val agg0 = new YourComplexLogicStrings()

  val seq = Seq(Row(1, 1, "startTime=1549270909"), Row(1, 1, "endTime=1549270913"))

  val rdd = spark.sparkContext.parallelize(seq)

  val df1 = spark.createDataFrame(rdd, StructType(List(StructField("sessionId", IntegerType, false), StructField("userId", IntegerType, false), StructField("session", StringType, false))))

  df1.groupBy("sessionId").agg(agg0(col("session")))
}

Это дает:

+---------+---------------------------------------+
|sessionId|yourcomplexlogicstrings(session)       |
+---------+---------------------------------------+
|1        |startTime=1549270909,endTime=1549270913|
+---------+---------------------------------------+

Обратите внимание, что вы можете включить очень сложную логику, используя функции spark sql напрямую, если вы хотите избежать UDAF.

...