Я пытаюсь написать сессионизирующий udaf для использования внутри спарк-окна. Я знаком с созданием базовых UDF, но не с выражениями катализатора более низкого уровня, так что терпите меня. Он должен быть достаточно гибким, чтобы поддерживать несколько случаев:
- Сеанс с нуля для сценария, в котором предоставляются данные с широким диапазоном дат (например, при обратной засыпке)
- Продолжайте использовать существующую строку сеанса, содержащую метку времени с начала этого сеанса, для сценария, когда добавляются новые данные в существующую таблицу (например, ежедневные или ежечасные обновления).
Для этой второй части код должен применить очень простую логику к входящей строке session_id: разбить строку по разделителю «___», извлечь 2-ую часть, привести ее к метке времени и присвоить ее переменной состояния firstTs. После присвоения ее переменной firstTs существующая логика будет использовать ее, чтобы решить, когда прервать текущий сеанс, а не начинать с нуля с этим событием, как это было бы в сценарии 1.
https://www.congiu.com/custom-window-function-in-spark-to-create-session-ids/ было очень полезно для начала. Я смог закончить дело 1, но 2 все еще озадачивает меня. Я попытался создать защищенные значения, содержащие функции, которые я бы использовал внутри части updateExpressions, но не могу понять, как точно определить и вызвать его.
Это текущий код:
import java.util.UUID
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, And, AttributeReference, EqualTo, Expression, If, IsNotNull, LessThanOrEqual, Literal, ScalaUDF, Subtract}
import org.apache.spark.sql.types.{DataType, LongType, StringType}
import org.apache.spark.sql.functions.lit
import org.apache.spark.unsafe.types.UTF8String
//https://www.congiu.com/custom-window-function-in-spark-to-create-session-ids/
object Sessionizer {
val defaultMaxGapms = 60 * 30 * 1000 // 30 mins
val defaultMaxDurationms = 3600 * 2 * 1000 // 2 hours
case class SessionUDWF( timestamp:Expression
,session:Expression
,sessionMaxGap:Expression = Literal(defaultMaxGapms)
,sessionMaxDuration:Expression = Literal(defaultMaxDurationms)
) extends AggregateWindowFunction {
self: Product =>
override def children: Seq[Expression] = Seq(timestamp, session)
override def dataType: DataType = StringType
protected val zero = Literal( 0L )
protected val nullString = Literal(null:String)
// defining the state variables and their types
protected val currentSession = AttributeReference("currentSession", StringType, nullable = true)()
protected val previousTs = AttributeReference("lastTs" , LongType , nullable = false)()
protected val firstTs = AttributeReference("firstTs" , LongType , nullable = false)()
override val aggBufferAttributes: Seq[AttributeReference] = currentSession :: previousTs :: firstTs :: Nil
protected val assignSession = {
val r = If(
And(
LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(2)), sessionMaxDuration),
LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(1)), sessionMaxGap)
),
aggBufferAttributes.head,
ScalaUDF( createNewSession, StringType, children = Nil))
r
}
override val initialValues: Seq[Expression] = nullString :: zero :: zero :: Nil
// if a session is already assigned, keep it, otherwise, assign one
// if current ts > first ts, keep first ts. if no first ts, assign current ts.
override val updateExpressions: Seq[Expression] = {
// first update the session string.
If(
IsNotNull(session)
,session
, assignSession) ::
// then update the previous timestamp
timestamp ::
// update first timestamp of current session
If(
LessThanOrEqual(Subtract(timestamp, aggBufferAttributes(2)), sessionMaxDuration) // if (current timestamp - first session timestamp) is smaller than the max_duration
, aggBufferAttributes(2) // if the delta is smaller than allowed duration delta, keep the first timestamp. then we in the middle of a session and haven't reached the max duration yet.
, If(EqualTo(aggBufferAttributes(2), lit(0L).expr) // if condition = false. we need to now check whether it's equal to 0L (initialization).
, timestamp // then assign the current timestamp
, timestamp // if it's not smaller
)) ::
Nil // todo what is this one doing here?
}
override val evaluateExpression: Expression = aggBufferAttributes(0)
override def prettyName: String = "makeSession"
}
// this is invoked whenever we need to create a a new session ID
protected val createNewSession= () => UTF8String.fromString(UUID.randomUUID().toString)
// todo how to only define using sessionMaxGap without maxduration?
def calculateSession(ts:Column,sess:Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, Literal(defaultMaxGapms)) }
def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr) }
def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column, sessionMaxDuration: Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr, sessionMaxDuration.expr) }
def calculateSession(ts:Column,sess:Column, sessionMaxGap:Column, sessionMaxDuration: Column, sessionRootName: Column): Column = withExpr { SessionUDWF(ts.expr,sess.expr, sessionMaxGap.expr, sessionMaxDuration.expr) }
private def withExpr(expr: Expression): Column = new Column(expr)
}
Это будет тестовый код:
case class UserActivityData(user:String, epoch:Long, session:String)
val st = System.currentTimeMillis()
val one_minute = 60 * 1000
val dWithFirstTs = Array[UserActivityData](
UserActivityData("user1", st, "f237e656-1e53-4a24-9ad5-2b4576a4125d___2016-01-01 12:54:33"),
UserActivityData("user1", st + 10*one_minute, null),
UserActivityData("user1", st + 15*one_minute, null),
UserActivityData("user1", st + 44*one_minute, null),
UserActivityData("user1", st + 125*one_minute, null),
UserActivityData("user1", st + 150*one_minute, null),
UserActivityData("user1", st + 300*one_minute, null),
UserActivityData("user2", st + 5*one_minute, null),
UserActivityData("user2", st + 15*one_minute, null)
)
val df = spark.sqlContext.createDataFrame(spark.sparkContext.parallelize(dWithFirstTs))
val specs = Window.partitionBy(f.col("user")).orderBy(f.col("epoch").asc)
val res = df.withColumn( "newsession", Sessionizer.calculateSession(f.col("epoch"), f.col("session")) over specs)
df.show(20)
res.withColumn("ts",f.expr("cast(epoch / 1000 as timestamp)")).orderBy("user","epoch").show(20, false)
Я бы хотел получить метку времени в первой строке и использовать ее в качестве переменной состояния 'firstTs'. Это включит пошаговую сессионизацию без необходимости перечитывать более ранние данные и пересчитывать столбец session_id.