Мне интересно, может ли издатель с реактивными потоками безопасно предположить, что подписки принадлежат ему только , и вызвать java.util.concurrent.Flow.Subscriber#onComplete
на нем, если издатель выходит из строя (например, закрывается).Пример кода ниже демонстрирует дилемму (очевидно, это просто некий синтетический код, демонстрирующий вопрос):
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimePublisher implements Flow.Publisher<Long> {
private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
private final ConcurrentLinkedQueue<Flow.Subscriber<? super Long>> subscribersList = new ConcurrentLinkedQueue<>();
private TimePublisher() {
}
public static TimePublisher newInstance() {
TimePublisher timePublisher = new TimePublisher();
timePublisher.startTickScheduler();
return timePublisher;
}
private void startTickScheduler() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
// does not make too much sense: just for the sake of the example
final long currentTimeMillis = System.currentTimeMillis();
subscribersList.forEach(sub -> sub.onNext(currentTimeMillis));
}, 1, 1, TimeUnit.SECONDS);
}
@Override
public void subscribe(Flow.Subscriber<? super Long> subscriber) {
subscribersList.add(subscriber);
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
// no-op in this sample
}
@Override
public void cancel() {
subscribersList.remove(subscriber);
}
});
}
public void stop() {
// the publisher can be stopped from the outside: after that it will
// definitely not emit any next items.
scheduledExecutorService.shutdown();
// QUESTION: can we assume that a Subscriber is subscribed to only this Publisher?
// if it is subscribed to another publisher, the following is illegal, as onNext
// could potentially be called by another Publisher...
subscribersList.forEach(Flow.Subscriber::onComplete);
subscribersList.clear();
}
}
- , когда вызывается
TimePublisher#stop
, этот конкретный издатель абсолютно не испустит onNext
вызовы, следовательно, вызов onComplete
представляется законным выбором - однако, если подписчик подписан также на другого издателя, то вызов
onComplete
может быть недопустимым, поскольку другой издатель все еще может излучатьэлементы.