Почему Flink не повторяет сбои оператора? - PullRequest
2 голосов
/ 23 сентября 2019

Я планирую перенести одно из наших приложений Spark в Apache Flink.Я пытаюсь понять его функцию отказоустойчивости.

Я выполнил следующий код, я не вижу, что Flink фактически пытается повторить любую задачу (или подзадачу).Это может привести к потере данных для меня.Что я должен сделать, чтобы убедиться, что все сбои покрыты Flink?

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("file:///my-path", false))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(0, TimeUnit.SECONDS) // delay
    ))
    env.enableCheckpointing(10L)
    val text = env.socketTextStream(hostName, port)
    text
      .map { input =>
        List(input)
      }.setParallelism(1)
      .flatMap { input =>
        println(s"running for $input")
        List.fill(5)(input.head) ::: {
          println("throw exception here")
          throw new RuntimeException("some exception")
          List("c")
        }
      }

Я ожидаю увидеть сообщение throw exception here пару раз на экране.Однако, когда я использую fixedDelayRestart, похоже, он просто игнорирует это сообщение и продолжает для других.

1 Ответ

1 голос
/ 24 сентября 2019

Зависит от того, как вы запустите приложение.

Я предполагаю, что вы используете это из своей IDE.В этом случае StreamExecutionEnvironment.getExecutionEnvironment возвращает LocalStreamExecutionEnvironment, который запускает программу и весь Flink в одном процессе, то есть master (в Flink JobManager) и worker (TaskManager) запускаются как потоки в одном и том же процессе JVM.Исключение завершает этот единственный процесс.Следовательно, не осталось процесса Flink, который мог бы перезапустить программу.

Если вы хотите запустить программу с отказоустойчивостью, вам нужно отправить ее в среду Flink, например ту, которая работает на вашем локальном компьютере.,Загрузите дистрибутив Flink, распакуйте архив и запустите ./bin/start-cluster.sh.Это запустит два процесса, основной и рабочий процесс.Затем вы можете отправить программу в кластер, создав среду удаленного выполнения с StreamExecutionEnvironment.createRemoteEnvironment и передав в качестве параметров имя хоста и порт (см. Подробности в документации).

Обратите внимание, что исключение все равно убьет работникапроцесс.Таким образом, чтобы иметь возможность перезапустить программу, вам нужно вручную запустить рабочий процесс.В производственной среде об этом обычно заботятся Kubernetes, Yarn или Mesos.

Кстати, недавно мы добавили игровую площадку к документации Flink.Это среда песочницы на основе Docker, в которой можно поиграть с функциями отказоустойчивости Flink.Я рекомендую проверить это: Игровая площадка Flink Operations .

Еще несколько подсказок:

  • Интервал контрольных точек 10 мс составляет очень коротких.
  • Источник текстового сокета не предоставляет гарантии, по крайней мере, один раз (или точно один раз).Записи обрабатываются не более одного раза.
...