добавление uid к оператору вызывает «Невозможно отобразить состояние контрольной точки / точки сохранения для оператора» - PullRequest
0 голосов
/ 10 июля 2019

у меня простой конвейер

env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
    .keyBy(_.id)
    .process(new Processor).uid("processor")
    .addSink(kafkaProducer).name(producerName)

Теперь я попытался просто добавить uid в мойку, как это

env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
    .keyBy(_.id)
    .process(new Processor).uid("processor")
    .addSink(kafkaProducer).name(producerName).uid("kafka-sink")

но я получаю очень длинное исключение, которое кажется, что это часть сообщения:

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/tmp/rocksdb/savepoint-445173-011657873d74. Cannot map checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
    at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
    at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
    at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1175)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:299)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)

это имеет смысл? есть ли способ ее решить без потери точки сохранения?

1 Ответ

0 голосов
/ 10 июля 2019

Эта проблема имеет смысл, потому что если вы не укажете идентификаторы вручную, они будут сгенерированы автоматически.Вероятно, сгенерированный идентификатор: 3cfeb06db0484d5556a7de8db2025f09.

У вас есть три варианта:

  1. Запуск задания без сохранения: это приведет к потере данных;
  2. Выполнитьзадание разрешает невосстановленное состояние;
  3. Используйте 3cfeb06db0484d5556a7de8db2025f09 как uid вашего оператора.

Вот несколько ссылок, которые могут вам помочь:

...