Приложение Flink [1.3.0] ничего не пишет в бэкэнде состояния hdfs - PullRequest
0 голосов
/ 29 октября 2019

У меня есть простое приложение flink с отслеживанием состояния и я включил контрольную точку, я бы записал данные контрольной точки в hdfs: / flink-savepoint / FlinkKafkaTest-checkpoint / $ {job_id}. Когда приложение какое-то время запускается, я проверил каталог, и этот каталог действительно существует в HDFS, но под ним ничего нет (никаких каталогов chk_xx).

Кроме того, я проверил журнал TaskManager, похоже, что контрольная точка периодически завершалась (см. Вывод: snapshotState is called and checkpointComplete, checkpointId: 1 3)

Команда, которую я запускаю, -:

flink run -m yarn-cluster -yn 6 ...

Мой код находится ниже журнала TaskManager, я бы спросил, где проблема, и спасите меня, спасибо!

18:48:40.685 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Trigger for Source: Custom Source (1/4) (019e4a7ece0baeb6e1d1e6a5c0a0a4b5). on task Source: Custom Source (1/4)
18:48:40.685 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Creating FileSystem stream leak safety net for Async calls on Source: Custom Source (1/4)
18:48:40.685 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (13) FULL_CHECKPOINT on task Source: Custom Source (1/4)
18:48:40.685 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 0
18:48:40.685 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Starting stream alignment for checkpoint 13.
snapshotState is called
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 1
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 2
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received barrier from channel 3
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Received all barriers, triggering checkpoint 13 at 1572346120678
18:48:40.686 [Async calls on Source: Custom Source (1/4)] INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ hdfs://hacluster/user/ioc/flink-savepoint/FlinkKafkaTest-checkpoint/11c79683b332737eb1fb0ae84ff91e71, synchronous part) in thread Thread[Async calls on Source: Custom Source (1/4),5,Flink Task Threads] took 1 ms.
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - End of stream alignment, feeding buffered data back
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 13 on task Source: Custom Source (1/4)
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.io.BarrierBuffer - Size of buffered data: 0 bytes
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Starting checkpoint (13) FULL_CHECKPOINT on task Sink: Unnamed (1/6)
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source (1/4) - finished synchronous part of checkpoint 13.Alignment duration: 0 ms, snapshot duration 0 ms
18:48:40.686 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source (1/4)
18:48:40.686 [pool-5-thread-1] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Source: Custom Source (1/4) - finished asynchronous part of checkpoint 13. Asynchronous duration: 0 ms
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Finished synchronous checkpoints for checkpoint 13 on task Sink: Unnamed (1/6)
18:48:40.686 [Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Sink: Unnamed (1/6) - finished synchronous part of checkpoint 13.Alignment duration: 0 ms, snapshot duration 0 ms
18:48:40.686 [pool-4-thread-1] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Sink: Unnamed (1/6) - finished asynchronous part of checkpoint 13. Asynchronous duration: 0 ms
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 13@1572346120678 for 019e4a7ece0baeb6e1d1e6a5c0a0a4b5.
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Confirmation for Source: Custom Source (1/4) on task Source: Custom Source (1/4)
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.yarn.YarnTaskManager - Receiver ConfirmCheckpoint 13@1572346120678 for 30ee4b703a06714825ec5fa1d2324b5a.
18:48:40.928 [Async calls on Source: Custom Source (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Notification of complete checkpoint for task Source: Custom Source (1/4)
18:48:40.928 [flink-akka.actor.default-dispatcher-3] DEBUG org.apache.flink.runtime.taskmanager.Task - Invoking async call Checkpoint Confirmation for Sink: Unnamed (1/6) on task Sink: Unnamed (1/6)
checkpointComplete, checkpointId: 13
18:48:40.928 [Async calls on Sink: Unnamed (1/6)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Notification of complete checkpoint for task Sink: Unnamed (1/6)
18:48:42.075 [flink-akka.actor.default-dispatcher-3] DEBUG akka.remote.RemoteWatcher - Sending Heartbeat to [akka.tcp://flink@dggtsp370-or:32586]
18:48:42.077 [flink-akka.actor.default-dispatcher-3] DEBUG akka.remote.RemoteWatcher - Received heartbeat rsp from [akka.tcp://flink@dggtsp370-or:32586]
18:48:42.624 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.yarn.YarnTaskManager - Sending heartbeat to JobManager

Основной метод определяется следующим образом:

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object CheckpointTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(10 * 1000, CheckpointingMode.EXACTLY_ONCE)

    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.addSource(new CheckpointSourceFunction).setParallelism(4).print()
    val path = "hdfs:///flink-savepoint/FlinkKafkaTest-checkpoint"
    val backend = new FsStateBackend(path)
    env.setStateBackend(backend)
    env.execute("CheckpointTest")
  }
}

и CheckpointSoruceFunction определяется как:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{CheckpointListener, FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import scala.collection.JavaConverters._

class CheckpointSourceFunction extends RichParallelSourceFunction[String] with CheckpointedFunction with CheckpointListener {
  @volatile
  var running: Boolean = true
  var count: Long = 0
  var countState: ListState[Long] = null
  var subId: Int = 0


  override def open(parameters: Configuration): Unit = {
    subId = this.getRuntimeContext.getIndexOfThisSubtask + 1
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (running) {
      ctx.collect(s"Hello-$count")
      count = count + 1
      Thread.sleep(2000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    println("snapshotState is called")
    countState.clear()
    countState.add(count)
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val desc = new ListStateDescriptor[Long]("countState", classOf[Long])
    countState = context.getOperatorStateStore.getListState(desc)
    count = countState.get().asScala.sum
    println(s"initializeState, count is: ${count}")
  }

  override def notifyCheckpointComplete(checkpointId: Long): Unit = {
    println(s"checkpointComplete, checkpointId: $checkpointId")
  }
}
...