Как применить UDF внутри пользовательского окна Spark для сеанса UDAF с использованием выражений Catalyst? - PullRequest
0 голосов
/ 11 мая 2019

Я пытаюсь написать сессионизирующий udaf для использования внутри спарк-окна. Я знаком с созданием базовых UDF, но не с выражениями катализатора более низкого уровня, так что терпите меня. Он должен быть достаточно гибким, чтобы поддерживать несколько случаев:

  1. Сеанс с нуля для сценария, в котором предоставляются данные с широким диапазоном дат (например, при обратной засыпке)
  2. Продолжайте использовать существующую строку сеанса, содержащую метку времени с начала этого сеанса, для сценария, когда добавляются новые данные в существующую таблицу (например, ежедневные или ежечасные обновления).

Для этой второй части код должен применить очень простую логику к входящей строке session_id: разбить строку по разделителю «___», извлечь 2-ую часть, привести ее к метке времени и присвоить ее переменной состояния firstTs. После присвоения ее переменной firstTs существующая логика будет использовать ее, чтобы решить, когда прервать текущий сеанс, а не начинать с нуля с этим событием, как это было бы в сценарии 1.

https://www.congiu.com/custom-window-function-in-spark-to-create-session-ids/ было очень полезно для начала. Я смог закончить дело 1, но 2 все еще озадачивает меня. Я попытался создать защищенные значения, содержащие функции, которые я бы использовал внутри части updateExpressions, но не могу понять, как точно определить и вызвать его.

Это текущий код:

import java.util.UUID

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, And, AttributeReference, EqualTo, Expression, If, IsNotNull, LessThanOrEqual, Literal, ScalaUDF, Subtract}
import org.apache.spark.sql.types.{DataType, LongType, StringType}
import org.apache.spark.sql.functions.lit
import org.apache.spark.unsafe.types.UTF8String

//https://www.congiu.com/custom-window-function-in-spark-to-create-session-ids/
object Sessionizer {
  val defaultMaxGapms      = 60 * 30  * 1000 // 30 mins
  val defaultMaxDurationms = 3600 * 2 * 1000 // 2 hours
  case class SessionUDWF( timestamp:Expression
                        ,session:Expression
                        ,sessionMaxGap:Expression = Literal(defaultMaxGapms)
                        ,sessionMaxDuration:Expression = Literal(defaultMaxDurationms)
                        ) extends AggregateWindowFunction {
    self: Product =>

    override def children: Seq[Expression] = Seq(timestamp, session)
    override def dataType: DataType = StringType

    protected val zero = Literal( 0L )
    protected val nullString = Literal(null:String)

    // defining the state variables and their types
    protected val currentSession =    AttributeReference("currentSession", StringType, nullable = true)()
    protected val previousTs     =    AttributeReference("lastTs"        , LongType  , nullable = false)()
    protected val firstTs        =    AttributeReference("firstTs"       , LongType  , nullable = false)()

    override val aggBufferAttributes: Seq[AttributeReference] =  currentSession  :: previousTs :: firstTs :: Nil

    protected val assignSession =  {
      val r = If(
        And(
          LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(2)), sessionMaxDuration),
          LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(1)), sessionMaxGap)
        ),
        aggBufferAttributes.head,
        ScalaUDF( createNewSession, StringType, children = Nil))
      r
    }
    override val initialValues: Seq[Expression] =  nullString :: zero :: zero :: Nil

    // if a session is already assigned, keep it, otherwise, assign one
    // if current ts > first ts, keep first ts. if no first ts, assign current ts.
    override val updateExpressions: Seq[Expression] = {
      // first update the session string.
      If(
        IsNotNull(session)
        ,session
        , assignSession) ::
      // then update the previous timestamp
      timestamp ::
      // update first timestamp of current session
        If(
          LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(2)), sessionMaxDuration) // if (current timestamp - first session timestamp) is smaller than the max_duration
          , aggBufferAttributes(2) // if the delta is smaller than allowed duration delta, keep the first timestamp. then we in the middle of a session and haven't reached the max duration yet.
          ,  If(EqualTo(aggBufferAttributes(2), lit(0L).expr) // if condition = false. we need to now check whether it's equal to 0L (initialization).
          , timestamp // then assign the current timestamp
            , timestamp // if it's not smaller
          )) ::
        Nil // todo what is this one doing here?
    }

    override val evaluateExpression: Expression = aggBufferAttributes(0)
    override def prettyName: String = "makeSession"
  }
  // this is invoked whenever we need to create a a new session ID
  protected val  createNewSession= () => UTF8String.fromString(UUID.randomUUID().toString)

  // todo how to only define using sessionMaxGap without maxduration?
  def calculateSession(ts:Column,sess:Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, Literal(defaultMaxGapms)) }
  def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr) }
  def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column, sessionMaxDuration: Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr, sessionMaxDuration.expr) }
  def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column, sessionMaxDuration: Column, sessionRootName: Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr, sessionMaxDuration.expr) }

  private def withExpr(expr: Expression): Column = new Column(expr)
}

Это будет тестовый код:

case class UserActivityData(user:String, epoch:Long, session:String)
val st = System.currentTimeMillis()
val one_minute = 60 * 1000

val dWithFirstTs = Array[UserActivityData](
    UserActivityData("user1",  st, "f237e656-1e53-4a24-9ad5-2b4576a4125d___2016-01-01 12:54:33"),
    UserActivityData("user1",  st +  10*one_minute, null),
    UserActivityData("user1",  st +  15*one_minute, null),
    UserActivityData("user1",  st +  44*one_minute, null),
    UserActivityData("user1",  st + 125*one_minute, null),
    UserActivityData("user1",  st + 150*one_minute, null),
    UserActivityData("user1",  st + 300*one_minute, null),
    UserActivityData("user2",  st +   5*one_minute, null),
    UserActivityData("user2",  st +  15*one_minute, null)
  )

val df = spark.sqlContext.createDataFrame(spark.sparkContext.parallelize(dWithFirstTs))
val specs = Window.partitionBy(f.col("user")).orderBy(f.col("epoch").asc)
val res = df.withColumn( "newsession", Sessionizer.calculateSession(f.col("epoch"), f.col("session")) over specs)

df.show(20)
res.withColumn("ts",f.expr("cast(epoch / 1000 as timestamp)")).orderBy("user","epoch").show(20, false)

Я бы хотел получить метку времени в первой строке и использовать ее в качестве переменной состояния 'firstTs'. Это включит пошаговую сессионизацию без необходимости перечитывать более ранние данные и пересчитывать столбец session_id.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...