Я читаю книгу с примером шаблона публикации / подписки с использованием интерфейсов, предоставляемых классом java.util.concurrent.Flow. У меня есть несколько вопросов, ни один из которых не имеет ответов в объяснении кода, и Javadoc для меня слишком абстрактен.
Реализация подписчика:
class DemoSubscriber<T> implements Flow.Subscriber<T> {
private String name;
private Flow.Subscription subscription;
public DemoSubscriber(String name){ this.name = name; }
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
public void onNext(T item) {
System.out.println(name + " received: " + item);
this.subscription.request(1);
}
public void onError(Throwable ex){ ex.printStackTrace();}
public void onComplete() { System.out.println("Completed"); }
}
Реализация подписки
class DemoSubscription<T> implements Flow.Subscription {
private final Flow.Subscriber<T> subscriber;
private final ExecutorService executor;
private Future<?> future;
private T item;
public DemoSubscription(Flow.Subscriber subscriber, ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public void request(long n) {
future = executor.submit(()-> this.subscriber.onNext(item));
}
public synchronized void cancel() {
if (future != null && !future.isCancelled()) {
this.future.cancel(true);
}
}
}
Связи между подписчиком, подпиской и издателем
void demoSubscribe(SubmissionPublisher<Integer> publisher, ExecutorService execService, String subscriberName){
DemoSubscriber<Integer> subscriber = new DemoSubscriber<>(subscriberName);
DemoSubscription subscription = new DemoSubscription(subscriber, execService);
subscriber.onSubscribe(subscription);
publisher.subscribe(subscriber);
}
Использованиевсе это
ExecutorService execService = ForkJoinPool.commonPool();
DemoSubscriber demoSubscriber;
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
demoSubscribe(publisher, execService, "One");
demoSubscribe(publisher, execService, "Two");
demoSubscribe(publisher, execService, "Three");
IntStream.range(1, 50).forEach(publisher::submit);
} finally { // shut down execService
Вот мои вопросы о методе Flow.Subscription.request (long n) и его реализации:
- В чем смысл запроса?
- Как n не влияет на процесс?
- Почему реализация создает асинхронную задачу для вызова Subscriber.onNext ()?
- Не является ли ответственность издателя вызывать Subscriber.onNext()?
Источник: Java 11 CookBook, 2-е издание, Ник Самойлов и Мохаммед Санаулла.