Я использую Flink, чтобы обогатить поток входных данных
case class Input( key: String, message: String )
предварительно вычисленными баллами
case class Score( key: String, score: Int )
и произвести вывод
case class Output( key: String, message: String, score: Int )
И вход, ипотоки партитур читаются из тем Kafka, и результирующий поток вывода также публикуется в Kafka
val processed = inputStream.connect( scoreStream )
.flatMap( new ScoreEnrichmentFunction )
.addSink( producer )
со следующей функцией ScoreEnrichment:
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )
override def flatMap1( input: Input, out: Collector[Output] ): Unit =
{
Option( scoreState.value ) match {
case None => out.collect( Output( input.key, input.message, -1 ) )
case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )
}
}
override def flatMap2( score: Score, out: Collector[Output] ): Unit =
{
scoreState.update( score )
}
}
Это работает хорошо.Однако, если я возьму безопасную точку и откажусь от задания Flink, результаты, сохраненные в ValueState, будут потеряны, когда я возобновлю работу с точки сохранения.
Как я понимаю, кажется, что ScoreEnrichmentFunction необходимо расширитьс CheckPointedFunction
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction
, но я изо всех сил пытаюсь понять, как реализовать методы snapshotState и initializeState для работы с состоянием ключа
override def snapshotState( context: FunctionSnapshotContext ): Unit = ???
override def initializeState( context: FunctionInitializationContext ): Unit = ???
Обратите внимание, что я использую следующий env:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism( 2 )
env.setBufferTimeout( 1 )
env.enableCheckpointing( 1000 )
env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
env.getCheckpointConfig.setCheckpointTimeout( 60000 )
env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )