В реактивных потоках (по крайней мере, в их java.util.concurrent.Flow
воплощении) подписчики просто запрашивают данные, и только издатель контролирует, как опубликовать эти данные.
Единственная универсальная реализация Flow.Publisher
, существующая в Java 9, - это SubmissionPublisher
, которая следует стандартному пабу / подпрограмме публикации любого опубликованного элемента всем подписчикам. Я не нашел простого способа взломать SubmissionPublisher
, чтобы сделать его доступным только для одного подписчика.
Но вы можете попытаться написать свою собственную реализацию Flow.Publisher
, что-то вроде этого:
class QueueLikePublisher<T> implements Publisher<T> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();
public synchronized void subscribe(Subscriber<? super T> subscriber) {
// subscribing: adding a new subscription to the list
QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void submit(T item) {
// we got some data: looking for non-completed and demanding
// subscription and give it the data item
for (QueueLikeSubscription<? super T> subscription : subscriptions) {
if (!subscription.completed && subscription.demand > 0) {
subscription.offer(item);
// we just give it to one subscriber; probaly offer() call needs
// to be wrapped in a try/catch
break;
}
}
}
static class QueueLikeSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private final ExecutorService executor;
volatile int demand = 0;
volatile boolean completed = false;
QueueLikeSubscription(Subscriber<? super T> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
// just extending the demand
demand += n;
}
}
}
public synchronized void cancel() {
completed = true;
}
Future<?> offer(T item) {
return executor.submit(() -> {
try {
subscriber.onNext(item);
} catch (Exception e) {
subscriber.onError(e);
}
});
}
}
}
Публикует элемент первому подписчику, который еще не завершен (например, отменен) и имеет ненулевой спрос.
Обратите внимание, что этот код является просто наброском для нечетических целей, чтобы продемонстрировать идею . Например, он, вероятно, должен содержать больше обработки исключений (например, обработку RejectedExecutionException
).