Всегда окончательно sh Mono.usingПри включении сигнала отмены - PullRequest
0 голосов
/ 01 мая 2020

мы пишем реактивное приложение WebFlux, которое в основном получает ресурс, выполняет объем работы при закрытии и в конце разблокирует ресурс либо с начальной версией, либо с обновленной версией после закрытия.

Пример:

Mono<ProductLock> lock = service.lock()

Mono.usingWhen(lock,
            (ProductLock state) -> service.doLongOperationAsync(state),
            (ProductLock state) -> service.unlock(state))

ProductLock - это определение заблокированного продукта, означающее, что мы можем выполнять операции через HTTP API, включая несколько микросервисов.

service.doLongOperationAsync() - вызывает несколько API-интерфейсов HTTP, которые ВСЕГДА ОЖИДАЮТСЯ завершиться запущен (сбой - это нормально, но если операция началась - ее нужно завершить, поскольку HTTP-вызов не может быть отменен)

service.unlock() - операция ДОЛЖНА быть вызвана только после успешного или неудачного выполнения doLongOperationAsyn c.

В счастливом сценарии ios все работает как положено: при успешном разблокировании продукта, при сбое также при разблокировке.

Проблемы возникают, когда клиент, который вызывает наш сервис (SOAP UI , POSTMAN, любой реальный клиент), разрывает соединение или время ожидания - сигнал отмены генерируется и поднимается до Бове код. На этом этапе все в service.doLongOperationAsync останавливается и service.unlock вызывается при отмене асинхронно.

Вопрос: как мы можем предотвратить это. Требования:

  1. после запуска doLongOperationAsync - он должен завершиться sh
  2. service.unlock(state) - должен вызываться ТОЛЬКО после doLongOperationAsync, даже при отмене.

Spring Boot repro

MRE:

@Bean
public RouterFunction<ServerResponse> route() {
    return RouterFunctions.route().GET("/work", request -> processRequest()
        .flatMap(res -> ServerResponse.ok().bodyValue(res))).build();
}

private Mono<String> processRequest() {
    //Need unlock to execute exactly after doWorkAsync in any case
    return Mono.usingWhen(lock(), this::doWorkAsync, this::unlock)
        .doOnNext((id) -> System.out.println("Request processed:" + id))
        .doOnCancel(() -> System.out.println("Request cancelled"));
}

private Mono<UUID> lock() {
    return Mono.defer(() -> Mono.just(UUID.randomUUID())
        .doOnNext(id -> System.out.println("Locked:" + id)));
}

//Need this to finish no matter what
private Mono<String> doWorkAsync(UUID lockID) {
    return Mono.just(lockID).map(UUID::toString)
        .doOnNext(id -> System.out.println("Start working on:" + lockID))
        .delayElement(Duration.ofSeconds(10))
        .doOnNext(id -> System.out.println("Finished work on:" + id))
        // Should never be called
        .doOnCancel(() -> System.out.println("Processing cancelled:" + lockID));
}

private Mono<Void> unlock(UUID lockID) {
    return Mono.fromRunnable(() -> System.out.println("Unlocking:" + lockID));
}

1 Ответ

0 голосов
/ 11 мая 2020

Очевидно, что для варианта использования работает Mono / Flux create с стоком. Таким образом, он инициируется при подписке, но по-прежнему не отменяется даже при запросе отмены:

Mono.create(sink -> {
   doWorkAsync().subscribe(sink::next, sink::error, null, sink.currentContext())
})

В случае использования, целое usingWhen должно быть помещено в create

...