Есть ли способ гарантировать, что все CheckpointListeners уведомлены о завершении контрольной точки на Flink при отмене задания с помощью точки сохранения? - PullRequest
0 голосов
/ 07 августа 2020

Я использую flink 1.9 и REST API /jobs/:jobid/savepoints, чтобы вызвать точку сохранения и отменить задание (изящно остановить задание для последующего запуска из точки сохранения). исходная функция, поэтому мой источник реализует интерфейсы CheckpointedFunction и CheckpointListener. При вызове метода snapshotState() я делаю снимок внутреннего состояния, а на notifyCheckpointComplete() - состояние контрольной точки в стороннюю систему.

Из того, что я вижу из исходного кода, только часть snapshotState() синхронна в CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
                    if (props.isSynchronous()) {
                        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    } else {
                        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                    }
                }

Подтверждение контрольной точки и уведомление о завершении является асинхронным в AsyncCheckpointRunnable.

При этом срабатывает savepoint с cancel-job, установленным на true , после того, как снимок сделан, некоторые диспетчеры задач продолжают получать уведомление о завершении перед отменой задания и выполняют notifyCheckpointComplete(), а некоторые нет.

Вопрос в том, есть ли способ отменить задание с помощью точка сохранения, чтобы notifyCheckpointComplete() гарантированно вызывалась всеми диспетчерами задач до отмены задания, или в данный момент нет возможности достичь этого?

Ответы [ 2 ]

2 голосов
/ 07 августа 2020

Прошло много времени с тех пор, как я смотрел Flink 1.9, поэтому, пожалуйста, примите мой ответ с некоторой осторожностью.

Я предполагаю, что ваши источники отменяют слишком рано. Итак, notifyCheckpointComplete фактически отправляется всем задачам, но некоторые SourceFunction уже завершили работу run, и соответствующая задача очищена.

Афайк, то, что вы описали, должно быть возможным, если вы проигнорируете отмену и прерывания, пока вы не получите последний notifyCheckpointComplete.

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // start two-phase commit
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // finish two-phase commit
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // do normal source stuff
        }
        // keep the task running after cancellation
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore interruptions until two-phase commit is done
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}
1 голос
/ 10 августа 2020
...