Публикация / подписка - реализация Flow.Subscription.request (длинный n) - PullRequest
0 голосов
/ 12 ноября 2019

Я читаю книгу с примером шаблона публикации / подписки с использованием интерфейсов, предоставляемых классом java.util.concurrent.Flow. У меня есть несколько вопросов, ни один из которых не имеет ответов в объяснении кода, и Javadoc для меня слишком абстрактен.

Реализация подписчика:

class DemoSubscriber<T> implements Flow.Subscriber<T> {
    private String name;
    private Flow.Subscription subscription;
    public DemoSubscriber(String name){ this.name = name; }
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }
    public void onNext(T item) {
        System.out.println(name + " received: " + item);
        this.subscription.request(1);
    }
    public void onError(Throwable ex){ ex.printStackTrace();}
    public void onComplete() { System.out.println("Completed"); }
}

Реализация подписки

class DemoSubscription<T> implements Flow.Subscription {
    private final Flow.Subscriber<T> subscriber;
    private final ExecutorService executor;
    private Future<?> future;
    private T item;
    public DemoSubscription(Flow.Subscriber subscriber, ExecutorService executor) {
        this.subscriber = subscriber;
        this.executor = executor;
    }
    public void request(long n) {
        future = executor.submit(()-> this.subscriber.onNext(item));
    }
    public synchronized void cancel() {
        if (future != null && !future.isCancelled()) {
            this.future.cancel(true);
        }
    }
}

Связи между подписчиком, подпиской и издателем

void demoSubscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService, String subscriberName){
    DemoSubscriber<Integer> subscriber = new DemoSubscriber<>(subscriberName);
    DemoSubscription subscription = new DemoSubscription(subscriber, execService);
    subscriber.onSubscribe(subscription);
    publisher.subscribe(subscriber);
}

Использованиевсе это

ExecutorService execService = ForkJoinPool.commonPool();
    DemoSubscriber demoSubscriber;
    try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
        demoSubscribe(publisher, execService, "One");
        demoSubscribe(publisher, execService, "Two");
        demoSubscribe(publisher, execService, "Three");
        IntStream.range(1, 50).forEach(publisher::submit);
    } finally { // shut down execService

Вот мои вопросы о методе Flow.Subscription.request (long n) и его реализации:

  1. В чем смысл запроса?
  2. Как n не влияет на процесс?
  3. Почему реализация создает асинхронную задачу для вызова Subscriber.onNext ()?
  4. Не является ли ответственность издателя вызывать Subscriber.onNext()?

Источник: Java 11 CookBook, 2-е издание, Ник Самойлов и Мохаммед Санаулла.

...