мы пишем реактивное приложение WebFlux, которое в основном получает ресурс, выполняет объем работы при закрытии и в конце разблокирует ресурс либо с начальной версией, либо с обновленной версией после закрытия.
Пример:
Mono<ProductLock> lock = service.lock()
Mono.usingWhen(lock,
(ProductLock state) -> service.doLongOperationAsync(state),
(ProductLock state) -> service.unlock(state))
ProductLock
- это определение заблокированного продукта, означающее, что мы можем выполнять операции через HTTP API, включая несколько микросервисов.
service.doLongOperationAsync()
- вызывает несколько API-интерфейсов HTTP, которые ВСЕГДА ОЖИДАЮТСЯ завершиться запущен (сбой - это нормально, но если операция началась - ее нужно завершить, поскольку HTTP-вызов не может быть отменен)
service.unlock()
- операция ДОЛЖНА быть вызвана только после успешного или неудачного выполнения doLongOperationAsyn c.
В счастливом сценарии ios все работает как положено: при успешном разблокировании продукта, при сбое также при разблокировке.
Проблемы возникают, когда клиент, который вызывает наш сервис (SOAP UI , POSTMAN, любой реальный клиент), разрывает соединение или время ожидания - сигнал отмены генерируется и поднимается до Бове код. На этом этапе все в service.doLongOperationAsync
останавливается и service.unlock
вызывается при отмене асинхронно.
Вопрос: как мы можем предотвратить это. Требования:
- после запуска
doLongOperationAsync
- он должен завершиться sh service.unlock(state)
- должен вызываться ТОЛЬКО после doLongOperationAsync
, даже при отмене.
Spring Boot repro
MRE:
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions.route().GET("/work", request -> processRequest()
.flatMap(res -> ServerResponse.ok().bodyValue(res))).build();
}
private Mono<String> processRequest() {
//Need unlock to execute exactly after doWorkAsync in any case
return Mono.usingWhen(lock(), this::doWorkAsync, this::unlock)
.doOnNext((id) -> System.out.println("Request processed:" + id))
.doOnCancel(() -> System.out.println("Request cancelled"));
}
private Mono<UUID> lock() {
return Mono.defer(() -> Mono.just(UUID.randomUUID())
.doOnNext(id -> System.out.println("Locked:" + id)));
}
//Need this to finish no matter what
private Mono<String> doWorkAsync(UUID lockID) {
return Mono.just(lockID).map(UUID::toString)
.doOnNext(id -> System.out.println("Start working on:" + lockID))
.delayElement(Duration.ofSeconds(10))
.doOnNext(id -> System.out.println("Finished work on:" + id))
// Should never be called
.doOnCancel(() -> System.out.println("Processing cancelled:" + lockID));
}
private Mono<Void> unlock(UUID lockID) {
return Mono.fromRunnable(() -> System.out.println("Unlocking:" + lockID));
}