у меня простой конвейер
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName)
Теперь я попытался просто добавить uid
в мойку, как это
env.addSource(kafkaConsumer).uid("kafka-src").name(consumerName)
.keyBy(_.id)
.process(new Processor).uid("processor")
.addSink(kafkaProducer).name(producerName).uid("kafka-sink")
но я получаю очень длинное исключение, которое кажется, что это часть сообщения:
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/tmp/rocksdb/savepoint-445173-011657873d74. Cannot map checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
at org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1251)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1175)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:299)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
это имеет смысл? есть ли способ ее решить без потери точки сохранения?