Я ищу способ настройки управления окнами, чтобы учесть задержки, а также позволить мне рассчитывать значения на основе предыдущих значений, рассчитанных для сеанса.
В целом значения моих сеансов являются уникальным идентификатором и никогда не должныесть столкновения, но технически сессии могут прийти в любое время.В большинстве сеансов большинство событий обрабатываются в течение 5 минут. Допустимое опоздание на 1 день должно удовлетворять любые поздние события.
stream
.keyBy { jsonEvent => jsonEvent.findValue("session").toString }
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5)))
.allowedLateness(Time.days(1))
.process { new SessionProcessor }
.addSink { new HttpSink }
Для каждого сеанса я нахожу максимальное значение поля и проверяю, что несколькособытия не произошли (если они произойдут, то поле max value обнулится).Я решил создать ProcessWindowFunction
для этого.
Class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
//Parse and calculate data
maxValue = if(badEvent1) 0 else maxValue
maxValue = if(badEvent2) 0 else maxValue
out.collect((string1,string2,string3, maxValue))
}
}
Это прекрасно работает до учета поздних событий.Когда происходит позднее событие, maxValue
пересчитывается и снова выводится в HttpSink
.Я ищу способ, чтобы я мог рассчитать дельту предыдущего maxValue
и позднего maxValue
.
Я ищу способ определить:
- Если вызов функции выполнен из позднего события (я не хочу удваивать общее количество сеансов)
- Что такое новые данные или если есть способ сохранить предыдущее вычисленное значение.
Любая помощь с этим будет принята с благодарностью.
Редактировать:Новый код, используемый для ValueState
KafkaConsumer.scala
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema
import org.apache.flink.streaming.api.scala._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode]("test-topic", new JSONDeserializationSchema, properties)
consumer.setStartFromLatest()
val stream = env.addSource(consumer)
stream
.keyBy { jsonEvent => jsonEvent.findValue("data").findValue("query").findValue("session").toString }
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.days(1))
.process {
new SessionProcessor
}
.print
env.execute("Kafka APN Consumer")
}
}
SessionProcessor.scala
import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {
final val previousValue = new ValueStateDescriptor("previousValue", classOf[Long])
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
val previousVal: ValueState[Long] = context.windowState.getState(previousValue)
val pVal: Long = previousVal.value match {
case i: Long => i
}
var session = ""
var user = ""
var department = ""
var lVal: Long = 0
elements.foreach( value => {
var jVal: String = "0"
if (value.findValue("data").findValue("query").has("value")) {
jVal = value.findValue("data").findValue("query").findValue("value").toString replaceAll("\"", "")
}
session = value.findValue("data").findValue("query").findValue("session").toString replaceAll("\"", "")
user = value.findValue("data").findValue("query").findValue("user").toString replaceAll("\"", "")
department = value.findValue("data").findValue("query").findValue("department").toString replaceAll("\"", "")
lVal = if (jVal.toLong > lVal) jVal.toLong else lVal
})
val increaseTime = lVal - pVal
previousVal.update(increaseTime)
out.collect((session, user, department, increaseTime))
}
}