Реактивное программирование - выполнение заданий в кластере - PullRequest
0 голосов
/ 26 января 2019

Мне нужно выполнить несколько заданий в кластере, только по одному за раз.Поскольку моя команда использует Hazelcast, я получил решение, основанное на реализации Hazelcast ILock .Для цели вопроса я собираюсь сделать обобщение по этому поводу.Предположим, у нас есть следующие интерфейсы (которые могут быть легко реализованы, например, Hazelcast или Reddison (Redis)):

public interface MyDistributedLock {

    boolean lock();

    void unlock();

    boolean isLockedByCurrentThread();
}

public interface MyLockDistributedFactory {

    MyDistributedLock getLock(String name);

}

И lock метод ожидания, если блокировка не может быть получена:

private Mono<Void> lock(String name, Publisher<?> publisher, MyLockDistributedFactory myLockFactory) {
    // important to release lock on the same thread as
    // it was aquired    
    Scheduler scheduler = Schedulers.newSingle(name.toLowerCase());

    return Mono.defer(() -> Mono.just(myLockFactory.getLock(name)))
        publishOn(scheduler)
         .doOnNext(MyDistributedLock::lock)
         .doOnNext(lock -> LOGGER.info("Process acquired lock for resource {}", name))
         .flatMapMany(lock -> Flux.from(publisher))
         .publishOn(scheduler) 
         .doFinally(signalType -> {
              MyDistributedLock lock = myLockFactory.getLock(name);
              if (signalType == SignalType.CANCEL) {
                 // cancel ignores publishOn 
                 scheduler.schedule(() -> { 
                    lock.unlock();
                    LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
                 });
              } else if (lock.isLockedByCurrentThread()) {
                 lock.unlock();
                 LOGGER.info("Process released lock for resource {} due to signal type {}", name, signalType);
              }
          })
          .then();
}

И пример какой-то работы

private Mono<Void> someJobRunEveryOneHourOnEveryNodeInCluster() {
    MyLockDistributedFactory hazelcast = ...;
    return lock("some-job", Flux.just(1,2), hazelcast)
                .repeatWhen(afterOneHour());
}

Интересно, является ли это хорошим подходом использования реактора Проекта (и правильной реализации) или это должно быть сделано по-другому.Пожалуйста, совет.

1 Ответ

0 голосов
/ 28 января 2019

это правильный подход , когда с использованием Reactor, потому что вы позаботились о смещении блокирующей части в выделенный Scheduler / Thread.

Но я бы сказал, что взаимноПодобный эксклюзивный код не очень подходит для реактивного программирования в целом: вы теряете одно из ключевых преимуществ: делать больше с меньшим количеством потоков, вы рискуете заблокировать другие части приложения, если забудете publishOn выделенный поток и т. д....

...