У меня есть решение для эксклюзивных вызовов удаленного сервиса с такими же параметрами.Может быть, это может быть полезно в вашем случае.
Он основан на немедленном tryLock
с ошибкой, если ресурс занят, и Mono.retryWhen
на "ожидание" освобождения.
Итак, у меня есть LockData
класс для метаданных блокировки
public final class LockData {
// Lock key to identify same operation (same cache key, for example).
private final String key;
// Unique identifier for equals and hashCode.
private final String uuid;
// Date and time of the acquiring for lock duration limiting.
private final OffsetDateTime acquiredDateTime;
...
}
LockCommand
интерфейс - это абстракция операций блокировки на LockData
public interface LockCommand {
Tuple2<Boolean, LockData> tryLock(LockData lockData);
void unlock(LockData lockData);
...
}
UnlockEventsRegistry
интерфейс - абстракция для сборщика событий слушателей разблокировки.
public interface UnlockEventsRegistry {
// initialize event listeners collection when acquire lock
Mono<Void> add(LockData lockData);
// notify event listeners and remove collection when release lock
Mono<Void> remove(LockData lockData);
// register event listener for given lockData
Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
}
И Lock
класс может обернуть исходный Mono блокировкой, разблокировкой и обернуть CacheMono Writer с разблокировкой.
public final class Lock {
private final LockCommand lockCommand;
private final LockData lockData;
private final UnlockEventsRegistry unlockEventsRegistry;
private final EmitterProcessor<Integer> unlockEvents;
private final FluxSink<Integer> unlockEventSink;
public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
this.lockCommand = lockCommand;
this.lockData = LockData.builder()
.key(key)
.uuid(UUID.randomUUID().toString())
.build();
this.unlockEventsRegistry = unlockEventsRegistry;
this.unlockEvents = EmitterProcessor.create(false);
this.unlockEventSink = unlockEvents.sink();
}
...
public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
.subscribeOn(scheduler)
.flatMap(isLocked -> {
if (isLocked.getT1()) {
return unlockEventsRegistry.add(lockData)
.then(source
.switchIfEmpty(unlock().then(Mono.empty()))
.onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
} else {
return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
}
});
}
public Mono<Void> unlock(Scheduler scheduler) {
return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
.then(unlockEventsRegistry.remove(lockData))
.subscribeOn(scheduler);
}
public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
Objects.requireNonNull(cacheWriter);
return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
.onErrorResume(throwable -> unlock()));
}
public final <T> UnaryOperator<Mono<T>> retryTransformer() {
return mono -> mono
.doOnError(LockIsNotAvailableException.class,
error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
.doOnNext(registered -> {
if (!registered) unlockEventSink.next(0);
})
.then(Mono.just(2).map(unlockEventSink::next)
.delaySubscription(lockCommand.getMaxLockDuration()))
.subscribe())
.doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
ignored -> unlockEventSink.next(0))
.retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
if (error instanceof LockIsNotAvailableException) return integer;
else throw Exceptions.propagate(error);
}));
}
}
Теперь, если мне нужно обернуть Mono с CacheMono и блокировать, яможно сделать так:
private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);
return CacheMono.lookup(CACHE_READER, cacheKey)
// Lock and double check
.onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
.andWriteWith(lock.unlockAfterCacheWriter(CACHE_WRITER))
// Retry if lock is not available
.transform(lock.retryTransformer());
}
Вы можете найти код и тесты с примерами на GitHub