У меня есть простое приложение flink с отслеживанием состояния и я включил контрольную точку, я бы записал данные контрольной точки в hdfs: / flink-savepoint / FlinkKafkaTest-checkpoint / $ {job_id}. Когда приложение какое-то время запускается, я проверил каталог, и этот каталог действительно существует в HDFS, но под ним ничего нет (никаких каталогов chk_xx).
Кроме того, я проверил журнал TaskManager, похоже, что контрольная точка периодически завершалась (см. Вывод: snapshotState is called and checkpointComplete, checkpointId: 1
3)
Команда, которую я запускаю, -:
flink run -m yarn-cluster -yn 6 ...
Мой код находится ниже журнала TaskManager, я бы спросил, где проблема, и спасите меня, спасибо!
18:48:40.685 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Trigger for Source: Custom Source (1/4) (019e4a7ece0baeb6e1d1e6a5c0a0a4b5). on task Source: Custom Source (1/4)
18:48:40.685 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for Async calls on Source: Custom Source (1/4)
18:48:40.685 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (13) FULL_CHECKPOINT on task Source: Custom Source (1/4)
18:48:40.685 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 0
18:48:40.685 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Starting stream alignment for checkpoint 13.
snapshotState is called
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 1
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 2
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 3
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received all barriers, triggering checkpoint 13 at 1572346120678
18:48:40.686 [Async calls on Source: Custom Source (1/4)] INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ hdfs://hacluster/user/ioc/flink-savepoint/FlinkKafkaTest-checkpoint/11c79683b332737eb1fb0ae84ff91e71, synchronous part) in thread Thread[Async calls on Source: Custom Source (1/4),5,Flink Task Threads] took 1 ms.
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - End of stream alignment, feeding buffered data back
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 13 on task Source: Custom Source (1/4)
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Size of buffered data: 0 bytes
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (13) FULL_CHECKPOINT on task Sink: Unnamed (1/6)
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source (1/4) - finished synchronous part of checkpoint 13.Alignment duration: 0 ms, snapshot duration 0 ms
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (1/4)
18:48:40.686 [pool-5-thread-1] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source (1/4) - finished asynchronous part of checkpoint 13. Asynchronous duration: 0 ms
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 13 on task Sink: Unnamed (1/6)
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Sink: Unnamed (1/6) - finished synchronous part of checkpoint 13.Alignment duration: 0 ms, snapshot duration 0 ms
18:48:40.686 [pool-4-thread-1] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Sink: Unnamed (1/6) - finished asynchronous part of checkpoint 13. Asynchronous duration: 0 ms
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 13@1572346120678 for 019e4a7ece0baeb6e1d1e6a5c0a0a4b5.
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Confirmation for Source: Custom Source (1/4) on task Source: Custom Source (1/4)
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 13@1572346120678 for 30ee4b703a06714825ec5fa1d2324b5a.
18:48:40.928 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Notification of complete checkpoint for task Source: Custom Source (1/4)
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Confirmation for Sink: Unnamed (1/6) on task Sink: Unnamed (1/6)
checkpointComplete, checkpointId: 13
18:48:40.928 [Async calls on Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Notification of complete checkpoint for task Sink: Unnamed (1/6)
18:48:42.075 [flink-akka.actor.default-dispatcher-3] DEBUG akka.remote.RemoteWatcher - Sending Heartbeat to [akka.tcp://flink@dggtsp370-or:32586]
18:48:42.077 [flink-akka.actor.default-dispatcher-3] DEBUG akka.remote.RemoteWatcher - Received heartbeat rsp from [akka.tcp://flink@dggtsp370-or:32586]
18:48:42.624 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.yarn.YarnTaskManager - Sending heartbeat to JobManager
Основной метод определяется следующим образом:
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CheckpointTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10 * 1000, CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.addSource(new CheckpointSourceFunction).setParallelism(4).print()
val path = "hdfs:///flink-savepoint/FlinkKafkaTest-checkpoint"
val backend = new FsStateBackend(path)
env.setStateBackend(backend)
env.execute("CheckpointTest")
}
}
и CheckpointSoruceFunction определяется как:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{CheckpointListener, FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import scala.collection.JavaConverters._
class CheckpointSourceFunction extends RichParallelSourceFunction[String] with CheckpointedFunction with CheckpointListener {
@volatile
var running: Boolean = true
var count: Long = 0
var countState: ListState[Long] = null
var subId: Int = 0
override def open(parameters: Configuration): Unit = {
subId = this.getRuntimeContext.getIndexOfThisSubtask + 1
}
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (running) {
ctx.collect(s"Hello-$count")
count = count + 1
Thread.sleep(2000)
}
}
override def cancel(): Unit = {
running = false
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
println("snapshotState is called")
countState.clear()
countState.add(count)
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val desc = new ListStateDescriptor[Long]("countState", classOf[Long])
countState = context.getOperatorStateStore.getListState(desc)
count = countState.get().asScala.sum
println(s"initializeState, count is: ${count}")
}
override def notifyCheckpointComplete(checkpointId: Long): Unit = {
println(s"checkpointComplete, checkpointId: $checkpointId")
}
}