Мне нужно выполнить несколько заданий в кластере, только по одному за раз.Поскольку моя команда использует Hazelcast, я получил решение, основанное на реализации Hazelcast ILock .Для цели вопроса я собираюсь сделать обобщение по этому поводу.Предположим, у нас есть следующие интерфейсы (которые могут быть легко реализованы, например, Hazelcast или Reddison (Redis)):
public interface MyDistributedLock {
boolean lock();
void unlock();
boolean isLockedByCurrentThread();
}
public interface MyLockDistributedFactory {
MyDistributedLock getLock(String name);
}
И lock метод ожидания, если блокировка не может быть получена:
private Mono<Void> lock(String name, Publisher<?> publisher, MyLockDistributedFactory myLockFactory) {
// important to release lock on the same thread as
// it was aquired
Scheduler scheduler = Schedulers.newSingle(name.toLowerCase());
return Mono.defer(() -> Mono.just(myLockFactory.getLock(name)))
publishOn(scheduler)
.doOnNext(MyDistributedLock::lock)
.doOnNext(lock -> LOGGER.info("Process acquired lock for resource {}", name))
.flatMapMany(lock -> Flux.from(publisher))
.publishOn(scheduler)
.doFinally(signalType -> {
MyDistributedLock lock = myLockFactory.getLock(name);
if (signalType == SignalType.CANCEL) {
// cancel ignores publishOn
scheduler.schedule(() -> {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
});
} else if (lock.isLockedByCurrentThread()) {
lock.unlock();
LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
}
})
.then();
}
И пример какой-то работы
private Mono<Void> someJobRunEveryOneHourOnEveryNodeInCluster() {
MyLockDistributedFactory hazelcast = ...;
return lock("some-job", Flux.just(1,2), hazelcast)
.repeatWhen(afterOneHour());
}
Интересно, является ли это хорошим подходом использования реактора Проекта (и правильной реализации) или это должно быть сделано по-другому.Пожалуйста, совет.