Flink: как сохранить и восстановить ValueState - PullRequest
0 голосов
/ 28 сентября 2018

Я использую Flink, чтобы обогатить поток входных данных

case class Input( key: String, message: String )

предварительно вычисленными баллами

case class Score( key: String, score: Int )

и произвести вывод

case class Output( key: String, message: String, score: Int )

И вход, ипотоки партитур читаются из тем Kafka, и результирующий поток вывода также публикуется в Kafka

val processed = inputStream.connect( scoreStream )
                           .flatMap( new ScoreEnrichmentFunction )
                           .addSink( producer )

со следующей функцией ScoreEnrichment:

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
    val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
    lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )

    override def flatMap1( input: Input, out: Collector[Output] ): Unit = 
    {
        Option( scoreState.value ) match {
            case None => out.collect( Output( input.key, input.message, -1 ) )
            case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )  
        }
    }

    override def flatMap2( score: Score, out: Collector[Output] ): Unit = 
    {
        scoreState.update( score )
    } 
}

Это работает хорошо.Однако, если я возьму безопасную точку и откажусь от задания Flink, результаты, сохраненные в ValueState, будут потеряны, когда я возобновлю работу с точки сохранения.

Как я понимаю, кажется, что ScoreEnrichmentFunction необходимо расширитьс CheckPointedFunction

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction

, но я изо всех сил пытаюсь понять, как реализовать методы snapshotState и initializeState для работы с состоянием ключа

override def snapshotState( context: FunctionSnapshotContext ): Unit = ???


override def initializeState( context: FunctionInitializationContext ): Unit = ???

Обратите внимание, что я использую следующий env:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism( 2 )
    env.setBufferTimeout( 1 )
    env.enableCheckpointing( 1000 )
    env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
    env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
    env.getCheckpointConfig.setCheckpointTimeout( 60000 )
    env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
    env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )

1 Ответ

0 голосов
/ 29 сентября 2018

Я думаю, что нашел проблему.Я пытался использовать отдельные каталоги для контрольных точек и точек сохранения, в результате чего каталог точек сохранения и каталог FsStateBackend были разными.

Использование одного и того же каталога в

val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )

ипри взятии точки сохранения

bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data

решает проблему.

...