Я пытаюсь использовать KeyedProcessFunction, но переменная ctx: Context
в processFunction
внутри моего KeyedProcessFunction
возвращает ноль. Обратите внимание, что я использую значение по умолчанию TimeCharacteristic
, равное ProcessingTime
(так что я даже не устанавливаю его).
Я нашел это в stackoverflow, но это относится к EventTime, а не ProcessingTime.
Следуя точному примеру https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example, я создал следующее, используя Scala 2.11.12 и Flink 1.10, и я все еще получаю то же самое ошибка.
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object example {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// the source data stream
val stream = env.socketTextStream("localhost", 9999).map(x => {
var splitCsv = x.stripLineEnd.split(",")
(splitCsv(0), splitCsv(1))
}
)
// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
result.print()
env.execute("Flink Streaming Demo STDOUT")
}
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(
value: (String, String),
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
}
Вот ошибка:
Причина: java .lang.NullPointerException в scala .Predef $ .Long2long (Predef. scala : 363) в com.leidos.example $ CountWithTimeoutFunction.processElement (пример. scala: 57) в com.leidos.example $ CountWithTimeoutFunction.processElement (пример. scala: 42) в org. apache .flink. streaming.api.operators.KeyedProcessOperator.processElement (KeyedProcessOperator. java: 85) в org. apache .flink.streaming.runtime.tasks.OneInputStreamTask $ StreamTaskNetworkOutput.emitRecord * 17InputStreamT * 17g или. apache .flink.streaming.runtime.io.StreamTaskNetworkInput.processElement (St reamTaskNetworkInput. java: 151) в орг. apache .flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext (StreamTaskNetworkInput. java: 128) в орг. apache .flink.streaming.runtime.io. StreamOneInputProcessor.processInput (StreamOneInputProcessor. java: 69) в орг. apache .flink.streaming.runtime.tasks.StreamTask.processInput (StreamTask. java: 311) в орг. apache .flink.streaming. runtime.tasks.mailbox.MailboxProcessor.runMailboxL oop (MailboxProcessor. java: 187) в org. apache .flink.streaming.runtime.tasks.StreamTask.runMailboxL oop (StreamTask. java: 487 ) в орг. apache .flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask. java: 470) в орг. apache .flink.runtime.taskmanager.Task.doRun (Задача. java : 707) в орг. apache .flink.runtime.taskmanager.Task.run (Задача. java: 532) в java .lang.Thread.run (Тема. java: 748)
Есть идеи, что я делаю не так? Заранее спасибо!