Контекст в processFunction для KeyedProcessFunction равен нулю - PullRequest
1 голос
/ 18 марта 2020

Я пытаюсь использовать KeyedProcessFunction, но переменная ctx: Context в processFunction внутри моего KeyedProcessFunction возвращает ноль. Обратите внимание, что я использую значение по умолчанию TimeCharacteristic, равное ProcessingTime (так что я даже не устанавливаю его).

Я нашел это в stackoverflow, но это относится к EventTime, а не ProcessingTime.

Следуя точному примеру https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example, я создал следующее, используя Scala 2.11.12 и Flink 1.10, и я все еще получаю то же самое ошибка.

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object example {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // the source data stream
    val stream = env.socketTextStream("localhost", 9999).map(x => {
      var splitCsv = x.stripLineEnd.split(",")
      (splitCsv(0), splitCsv(1))
    }
    )

    // apply the process function onto a keyed stream
    val result: DataStream[Tuple2[String, Long]] = stream
      .keyBy(0)
      .process(new CountWithTimeoutFunction())

    result.print()

    env.execute("Flink Streaming Demo STDOUT")

  }

  /**
   * The data type stored in the state
   */
  case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

  /**
   * The implementation of the ProcessFunction that maintains the count and timeouts
   */
  class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {

    /** The state that is maintained by this process function */
    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
      .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


    override def processElement(
                                 value: (String, String),
                                 ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
                                 out: Collector[(String, Long)]): Unit = {

      // initialize or retrieve/update the state
      val current: CountWithTimestamp = state.value match {
        case null =>
          CountWithTimestamp(value._1, 1, ctx.timestamp)
        case CountWithTimestamp(key, count, lastModified) =>
          CountWithTimestamp(key, count + 1, ctx.timestamp)
      }

      // write the state back
      state.update(current)

      // schedule the next timer 60 seconds from the current event time
      ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
    }

    override def onTimer(
                          timestamp: Long,
                          ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
                          out: Collector[(String, Long)]): Unit = {

      state.value match {
        case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
          out.collect((key, count))
        case _ =>
      }
    }
  }
}

Вот ошибка:

Причина: java .lang.NullPointerException в scala .Predef $ .Long2long (Predef. scala : 363) в com.leidos.example $ CountWithTimeoutFunction.processElement (пример. scala: 57) в com.leidos.example $ CountWithTimeoutFunction.processElement (пример. scala: 42) в org. apache .flink. streaming.api.operators.KeyedProcessOperator.processElement (KeyedProcessOperator. java: 85) в org. apache .flink.streaming.runtime.tasks.OneInputStreamTask $ StreamTaskNetworkOutput.emitRecord * 17InputStreamT * 17g или. apache .flink.streaming.runtime.io.StreamTaskNetworkInput.processElement (St reamTaskNetworkInput. java: 151) в орг. apache .flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext (StreamTaskNetworkInput. java: 128) в орг. apache .flink.streaming.runtime.io. StreamOneInputProcessor.processInput (StreamOneInputProcessor. java: 69) в орг. apache .flink.streaming.runtime.tasks.StreamTask.processInput (StreamTask. java: 311) в орг. apache .flink.streaming. runtime.tasks.mailbox.MailboxProcessor.runMailboxL oop (MailboxProcessor. java: 187) в org. apache .flink.streaming.runtime.tasks.StreamTask.runMailboxL oop (StreamTask. java: 487 ) в орг. apache .flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask. java: 470) в орг. apache .flink.runtime.taskmanager.Task.doRun (Задача. java : 707) в орг. apache .flink.runtime.taskmanager.Task.run (Задача. java: 532) в java .lang.Thread.run (Тема. java: 748)

Есть идеи, что я делаю не так? Заранее спасибо!

1 Ответ

1 голос
/ 19 марта 2020

Проблема в том, что в строке 57 вы получаете доступ к полю timestamp поля Context. Это поле равно null, если вы используете ProcessingTime или если вы не указали экстрактор метки времени при использовании EventTime.

...