Реагирующие потоки Java 9: ​​принадлежит ли один подписчик одному издателю - PullRequest
2 голосов
/ 20 марта 2019

Мне интересно, может ли издатель с реактивными потоками безопасно предположить, что подписки принадлежат ему только , и вызвать java.util.concurrent.Flow.Subscriber#onComplete на нем, если издатель выходит из строя (например, закрывается).Пример кода ниже демонстрирует дилемму (очевидно, это просто некий синтетический код, демонстрирующий вопрос):

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TimePublisher implements Flow.Publisher<Long> {

    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

    private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();


    private TimePublisher() {

    }

    public static TimePublisher newInstance() {
        TimePublisher timePublisher = new TimePublisher();
        timePublisher.startTickScheduler();

        return timePublisher;
    }

    private void startTickScheduler() {
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // does not make too much sense: just for the sake of the example
            final long currentTimeMillis = System.currentTimeMillis();
            subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
        }, 1, 1, TimeUnit.SECONDS);
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Long> subscriber) {

        subscribersList.add(subscriber);

        subscriber.onSubscribe(new Flow.Subscription() {
            @Override
            public void request(long n) {
                // no-op in this sample
            }

            @Override
            public void cancel() {
                subscribersList.remove(subscriber);
            }
        });
    }

    public void stop() {
        // the publisher can be stopped from the outside: after that it will
        // definitely not emit any next items.
        scheduledExecutorService.shutdown();

        // QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
        // if it is subscribed to another publisher, the following is illegal, as onNext
        // could potentially be called by another Publisher...
        subscribersList.forEach(Flow.Subscriber::onComplete);

        subscribersList.clear();
    }
}
  • , когда вызывается TimePublisher#stop, этот конкретный издатель абсолютно не испустит onNext вызовы, следовательно, вызов onComplete представляется законным выбором
  • однако, если подписчик подписан также на другого издателя, то вызов onComplete может быть недопустимым, поскольку другой издатель все еще может излучатьэлементы.

1 Ответ

3 голосов
/ 20 марта 2019

Документация для Subscriber говорит

Методы в этом интерфейсе вызываются в строгом последовательном порядке для каждой подписки Flow.Subscription .

onComplete в частности:

Метод вызывается, когда известно, что никаких дополнительных вызовов методов подписчика для подписки, которая еще не завершена по ошибке, не будет, после чего никакие другие методы подписчика не будут вызваны по подписке .Если этот метод генерирует исключение, результирующее поведение не определено.

Так что другие Subscription s могут продолжать вызывать методы законно.

Flow Документация гласит, что несколько Subscription s в реализации Subscriber возможен, но не рекомендуется:

Поскольку вызовы методов подписчика для данного потока. Подписка строго упорядочена, для этих методов нет необходимости использовать блокировки или летучие компоненты, если толькоподписчик поддерживает несколько подписок (в этом случае лучше вместо этого определить несколько подписчиков, каждый со своей подпиской).

...