Я использую flink 1.9 и REST API /jobs/:jobid/savepoints
, чтобы вызвать точку сохранения и отменить задание (изящно остановить задание для последующего запуска из точки сохранения). исходная функция, поэтому мой источник реализует интерфейсы CheckpointedFunction
и CheckpointListener
. При вызове метода snapshotState()
я делаю снимок внутреннего состояния, а на notifyCheckpointComplete()
- состояние контрольной точки в стороннюю систему.
Из того, что я вижу из исходного кода, только часть snapshotState()
синхронна в CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
Подтверждение контрольной точки и уведомление о завершении является асинхронным в AsyncCheckpointRunnable
.
При этом срабатывает savepoint
с cancel-job
, установленным на true
, после того, как снимок сделан, некоторые диспетчеры задач продолжают получать уведомление о завершении перед отменой задания и выполняют notifyCheckpointComplete()
, а некоторые нет.
Вопрос в том, есть ли способ отменить задание с помощью точка сохранения, чтобы notifyCheckpointComplete()
гарантированно вызывалась всеми диспетчерами задач до отмены задания, или в данный момент нет возможности достичь этого?