Apache Flink: восстановление заданий при выполнении IDE не работает должным образом - PullRequest
0 голосов
/ 14 апреля 2019

У меня есть пример потокового WordCount примера, написанного на Flink (Scala).В нем я хочу использовать внешнюю контрольную точку для восстановления в случае сбоя.Но он не работает должным образом.

Мой код выглядит следующим образом:

object WordCount {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment
      .getExecutionEnvironment
      .setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))

    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // prepare Kafka consumer properties
    val kafkaConsumerProperties = new Properties
    kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
    kafkaConsumerProperties.setProperty("group.id", "flink")
    kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")

    // set up Kafka Consumer
    val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)

    println("Executing WordCount example.")

    // get text from Kafka
    val text = env.addSource(kafkaConsumer)

    val counts: DataStream[(String, Int)] = text
      // split up the lines in pairs (2-tuples) containing: (word,1)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      // group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0)
      .mapWithState((in: (String, Int), count: Option[Int]) =>
        count match {
          case Some(c) => ((in._1, c), Some(c + in._2))
          case None => ((in._1, 1), Some(in._2 + 1))
        })

    // emit result
    println("Printing result to stdout.")
    counts.print()

    // execute program
    env.execute("Streaming WordCount")
  }
}

Вывод, который я получаю после первого запуска программы:

(hi, 1)
(hi, 2)

вывод, который я получаю после запуска программы во второй раз:

(hi, 1)

Я ожидаю, что запуск программы во второй раз должен дать мне следующий вывод:

(hi, 3)

Поскольку я новичок в ApacheФлинк, я не знаю, как добиться ожидаемого результата.Может ли кто-нибудь помочь мне добиться правильного поведения?

1 Ответ

3 голосов
/ 15 апреля 2019

Flink перезапускается только с последней контрольной точки, если приложение перезапускается в течение того же выполнения (обычное, автоматическое восстановление).

Если вы отмените задание, запущенное в локальной среде выполнения в IDE, вы уничтожите весь кластер, и задание не может быть автоматически восстановлено. Вместо этого вам нужно начать это снова. Чтобы перезапустить новое задание из точки сохранения (или внешней контрольной точки), необходимо указать путь к сохраненной точке сохранения / контрольной точке. Не уверен, что это возможно в локальной среде исполнения.

IMO, проще поиграть с контрольными точками и восстановлением на локальном экземпляре Flink, а не в IDE.

...