В моем простом приложении файлы контрольных точек не создаются - PullRequest
0 голосов
/ 26 сентября 2019

У меня есть следующее простое приложение flink, работающее в IDE, и я делаю контрольную точку каждые 5 секунд, и хотел бы записать данные контрольной точки в каталог file:///d:/applog/out/mycheckpoint/, но после некоторого запуска я остановил приложение, ноЯ ничего не нашел в каталоге file:///d:/applog/out/mycheckpoint/

Код:

import java.util.Date

import io.github.streamingwithflink.util.DateUtil
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    object SourceFunctionExample {

      def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(4)
        env.getCheckpointConfig.setCheckpointInterval(5 * 1000)
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        env.setStateBackend(new FsStateBackend("file:///d:/applog/out/mycheckpoint/"))
        val numbers: DataStream[Long] = env.addSource(new ReplayableCountSource)
        numbers.print()

        env.execute()
      }

    }


class ReplayableCountSource extends SourceFunction[Long] with CheckpointedFunction {

  var isRunning: Boolean = true
  var cnt: Long = _
  var offsetState: ListState[Long] = _

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {

    while (isRunning && cnt < Long.MaxValue) {
      ctx.getCheckpointLock.synchronized {
        // increment cnt
        cnt += 1
        ctx.collect(cnt)
      }
      Thread.sleep(200)
    }

  }

  override def cancel(): Unit = isRunning = false

  override def snapshotState(snapshotCtx: FunctionSnapshotContext): Unit = {
    println("snapshotState is called at " + DateUtil.format(new Date) + s", cnt is ${cnt}")
    // remove previous cnt
    offsetState.clear()
    // add current cnt
    offsetState.add(cnt)
  }

  override def initializeState(initCtx: FunctionInitializationContext): Unit = {
    // obtain operator list state to store the current cnt

    val desc = new ListStateDescriptor[Long]("offset", classOf[Long])
    offsetState = initCtx.getOperatorStateStore.getListState(desc)

    // initialize cnt variable from the checkpoint
    val it = offsetState.get()
    cnt = if (null == it || !it.iterator().hasNext) {
      -1L
    } else {
      it.iterator().next()
    }

    println("initializeState is called at " + DateUtil.format(new Date) + s", cnt is ${cnt}")
  }
}

1 Ответ

1 голос
/ 26 сентября 2019

Я протестировал приложение в Windows и Linux, и в обоих случаях файлы контрольных точек были созданы, как и ожидалось.

Обратите внимание, что программа продолжает работать в случае сбоя контрольной точки, например, из-за некоторых ошибок разрешения или неверного пути,

Flink регистрирует сообщение WARN с исключением, которое привело к сбою контрольной точки.

...