Неблокирующий реентранлок с реактором - PullRequest
0 голосов
/ 26 октября 2018

Мне нужно ограничить количество клиентов, обрабатывающих один и тот же ресурс одновременно
, поэтому я попытался реализовать аналог

lock.lock();
try {
     do work
} finally {
    lock.unlock();
}

, но неблокирующим образом с библиотекой Reactor.И у меня есть что-то вроде этого.

Но у меня есть вопрос:
Есть ли лучший способ сделать это
или, может быть, кто-то знает о реализованном решении
или, возможно, это не таккак это должно быть сделано в реактивном мире, и есть ли другой подход к таким проблемам?

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class NonblockingLock {
    private static final Logger LOG = LoggerFactory.getLogger(NonblockingLock.class);

    private String currentOwner;
    private final AtomicInteger lockCounter = new AtomicInteger();
    private final FluxSink<Boolean> notifierSink;
    private final Flux<Boolean> notifier;
    private final String resourceId;

    public NonblockingLock(String resourceId) {
        this.resourceId = resourceId;
        EmitterProcessor<Boolean> processor = EmitterProcessor.create(1, false);
        notifierSink = processor.sink(FluxSink.OverflowStrategy.LATEST);
        notifier = processor.startWith(true);
    }

    /**
     * Nonblocking version of
     * <pre><code>
     *     lock.lock();
     *     try {
     *         do work
     *     } finally {
     *         lock.unlock();
     *     }
     * 
* * / public Flux processWithLock (владелец строки, @Nullable Duration tryLockTimeout, Flux работа) {Objects.requireNonNull (владелец, "владелец");вернуть notifier.filter (it -> tryAcquire (owner)) .next () .transform (locked -> tryLockTimeout == null? locked: locked.timeout (tryLockTimeout)) .doOnSubscribe (s -> LOG.debug ("пытаетсяполучить блокировку для resourceId: {}, по владельцу: {} ", resourceId, владельцу)) .doOnError (err -> LOG.error (" не удалось получить блокировку для resourceId: {}, по владельцу: {}, ошибка:{} ", resourceId, владелец, err.getMessage ())) .flatMapMany (это -> работа) .doFinally (s -> {if (tryRelease (owner)) {LOG.debug (" отпустить блокировку resourceId: {},owner: {} ", resourceId, owner); notifierSink.next (true);}});} private boolean tryAcquire (String owner) {boolean приобретенный;синхронизирован (это) {if (currentOwner == null) {currentOwner = owner;} приобретенный = currentOwner.equals (владелец);if (приобретенный) {lockCounter.incrementAndGet ();}} возврат приобретенного;} private boolean tryRelease (String owner) {boolean release = false;synchronized (this) {if (currentOwner.equals (owner)) {int count = lockCounter.decrementAndGet ();if (count == 0) {currentOwner = null;освобожден = правда;}}} возвращение освобождено;}}

и это, как я полагаю, должно работать

@Test
public void processWithLock() throws Exception {
    NonblockingLock lock = new NonblockingLock("work");
    String client1 = "client1";
    String client2 = "client2";
    Flux<String> requests = getWork(client1, lock)
            //emulate async request for resource by another client
            .mergeWith(Mono.delay(Duration.ofMillis(300)).flatMapMany(it -> getWork(client2, lock)))
            //emulate async request for resource by the same client
            .mergeWith(Mono.delay(Duration.ofMillis(400)).flatMapMany(it -> getWork(client1, lock)));
    StepVerifier.create(requests)
            .expectSubscription()
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client1)
            .expectNext(client2)
            .expectNext(client2)
            .expectNext(client2)
            .expectComplete()
            .verify(Duration.ofMillis(5000));
}
private static Flux<String> getWork(String client, NonblockingLock lock) {
    return lock.processWithLock(client, null,
            Flux.interval(Duration.ofMillis(300))
                    .take(3)
                    .map(i -> client)
                    .log(client)
    );
}

1 Ответ

0 голосов
/ 28 октября 2018

У меня есть решение для эксклюзивных вызовов удаленного сервиса с такими же параметрами.Может быть, это может быть полезно в вашем случае.

Он основан на немедленном tryLock с ошибкой, если ресурс занят, и Mono.retryWhen на "ожидание" освобождения.

Итак, у меня есть LockData класс для метаданных блокировки

public final class LockData {
    // Lock key to identify same operation (same cache key, for example).
    private final String key;
    // Unique identifier for equals and hashCode.
    private final String uuid;
    // Date and time of the acquiring for lock duration limiting.
    private final OffsetDateTime acquiredDateTime;
    ...
}

LockCommand интерфейс - это абстракция операций блокировки на LockData

public interface LockCommand {

    Tuple2<Boolean, LockData> tryLock(LockData lockData);

    void unlock(LockData lockData);
    ...
}

UnlockEventsRegistry интерфейс - абстракция для сборщика событий слушателей разблокировки.

public interface UnlockEventsRegistry {
    // initialize event listeners collection when acquire lock
    Mono<Void> add(LockData lockData);

    // notify event listeners and remove collection when release lock
    Mono<Void> remove(LockData lockData);

    // register event listener for given lockData
    Mono<Boolean> register(LockData lockData, Consumer<Integer> unlockEventListener);
}

И Lock класс может обернуть исходный Mono блокировкой, разблокировкой и обернуть CacheMono Writer с разблокировкой.

public final class Lock {
    private final LockCommand lockCommand;
    private final LockData lockData;
    private final UnlockEventsRegistry unlockEventsRegistry;
    private final EmitterProcessor<Integer> unlockEvents;
    private final FluxSink<Integer> unlockEventSink;

    public Lock(LockCommand lockCommand, String key, UnlockEventsRegistry unlockEventsRegistry) {
        this.lockCommand = lockCommand;
        this.lockData = LockData.builder()
                .key(key)
                .uuid(UUID.randomUUID().toString())
                .build();
        this.unlockEventsRegistry = unlockEventsRegistry;
        this.unlockEvents = EmitterProcessor.create(false);
        this.unlockEventSink = unlockEvents.sink();
    }

    ...

    public final <T> Mono<T> tryLock(Mono<T> source, Scheduler scheduler) {
        return Mono.fromCallable(() -> lockCommand.tryLock(lockData))
                .subscribeOn(scheduler)
                .flatMap(isLocked -> {
                    if (isLocked.getT1()) {
                        return unlockEventsRegistry.add(lockData)
                                .then(source
                                        .switchIfEmpty(unlock().then(Mono.empty()))
                                        .onErrorResume(throwable -> unlock().then(Mono.error(throwable))));
                    } else {
                        return Mono.error(new LockIsNotAvailableException(isLocked.getT2()));
                    }
                });
    }

    public Mono<Void> unlock(Scheduler scheduler) {
        return Mono.<Void>fromRunnable(() -> lockCommand.unlock(lockData))
                .then(unlockEventsRegistry.remove(lockData))
                .subscribeOn(scheduler);
    }

    public <KEY, VALUE> BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> unlockAfterCacheWriter(
            BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> cacheWriter) {
        Objects.requireNonNull(cacheWriter);
        return cacheWriter.andThen(voidMono -> voidMono.then(unlock())
                .onErrorResume(throwable -> unlock()));
    }

    public final <T> UnaryOperator<Mono<T>> retryTransformer() {
        return mono -> mono
                .doOnError(LockIsNotAvailableException.class,
                        error -> unlockEventsRegistry.register(error.getLockData(), unlockEventSink::next)
                                .doOnNext(registered -> {
                                    if (!registered) unlockEventSink.next(0);
                                })
                                .then(Mono.just(2).map(unlockEventSink::next)
                                        .delaySubscription(lockCommand.getMaxLockDuration()))
                                .subscribe())
                .doOnError(throwable -> !(throwable instanceof LockIsNotAvailableException),
                        ignored -> unlockEventSink.next(0))
                .retryWhen(errorFlux -> errorFlux.zipWith(unlockEvents, (error, integer) -> {
                    if (error instanceof LockIsNotAvailableException) return integer;
                    else throw Exceptions.propagate(error);
                }));
    }
}

Теперь, если мне нужно обернуть Mono с CacheMono и блокировать, яможно сделать так:

private Mono<String> getCachedLockedMono(String cacheKey, Mono<String> source, LockCommand lockCommand, UnlockEventsRegistry unlockEventsRegistry) {
    Lock lock = new Lock(lockCommand, cacheKey, unlockEventsRegistry);

    return CacheMono.lookup(CACHE_READER, cacheKey)
            // Lock and double check
            .onCacheMissResume(() -> lock.tryLock(Mono.fromCallable(CACHE::get).switchIfEmpty(source)))
            .andWriteWith(lock.unlockAfterCacheWriter(CACHE_WRITER))
            // Retry if lock is not available
            .transform(lock.retryTransformer());
}

Вы можете найти код и тесты с примерами на GitHub

...