Публикация данных на java 9 Поток подписчикам таким образом, что их будет использовать только один подписчик - PullRequest
0 голосов
/ 01 мая 2018

Есть ли способ опубликовать данные для подписчиков таким образом, чтобы их получал только один подписчик? Я пытаюсь добиться того, чтобы модель издателя подписчика работала как очередь с несколькими читателями, но с одним издателем. Как только издатель опубликует данные, первый подписчик, который их получит, будет единственным, кто их обработает.

Заранее спасибо !!!

Ответы [ 2 ]

0 голосов
/ 02 мая 2018

Обычен случай, когда только один подписчик должен получать каждый элемент данных. Например, подписчиками могут быть соединения с базой данных и элементы данных - запросы к базе данных, а издатель - центральная точка входа во весь пул соединений. Это другой протокол обмена данными, поэтому использование интерфейсов из j.u.c.Flow может привести к путанице.

Как правило, эти интерфейсы могут использоваться для этого протокола мастер-работник, но есть небольшое, но важное отличие: подписчики не должны запрашивать более одного элемента данных одновременно. В противном случае один работник может взять несколько предметов, а другие - без работы. Таким образом, метод Subscription#request() может быть удален из интерфейса. Предполагается, что посредством подписки подписчик соглашается принять один элемент данных. Как только этот элемент передается подписчику, подписчик отписывается. Это позволяет не сканировать список подписок, пытающихся найти приемлемого подписчика (как в реализации @Roman Puchkovskiy), но передать следующий элемент данных первому подписчику. Как только абоненту нужно больше данных, он снова подписывается. Именно так рабочие потоки в пуле потоков запрашивают следующие задачи.

Поскольку метод cancel() остается единственным методом в Subscription, мы можем заменить его новым методом Publisher#cancel(Subscriber) и полностью исключить интерфейс Subscription. Затем метод Subscriber#onSubscribe(Subscription) заменяется методом Subscriber#onSubscribe(Publisher).

Я занимаюсь разработкой асинхронной библиотеки (она еще не производственного качества), которая содержит решение для вашего варианта использования: класс PickPoint .

0 голосов
/ 01 мая 2018

В реактивных потоках (по крайней мере, в их java.util.concurrent.Flow воплощении) подписчики просто запрашивают данные, и только издатель контролирует, как опубликовать эти данные.

Единственная универсальная реализация Flow.Publisher, существующая в Java 9, - это SubmissionPublisher, которая следует стандартному пабу / подпрограмме публикации любого опубликованного элемента всем подписчикам. Я не нашел простого способа взломать SubmissionPublisher, чтобы сделать его доступным только для одного подписчика.

Но вы можете попытаться написать свою собственную реализацию Flow.Publisher, что-то вроде этого:

class QueueLikePublisher<T> implements Publisher<T> {
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
    private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        // subscribing: adding a new subscription to the list
        QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
        subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    public void submit(T item) {
        // we got some data: looking for non-completed and demanding
        // subscription and give it the data item

        for (QueueLikeSubscription<? super T> subscription : subscriptions) {
            if (!subscription.completed && subscription.demand > 0) {
                subscription.offer(item);
                // we just give it to one subscriber; probaly offer() call needs
                // to be wrapped in a try/catch
                break;
            }
        }
    }

    static class QueueLikeSubscription<T> implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final ExecutorService executor;
        volatile int demand = 0;
        volatile boolean completed = false;

        QueueLikeSubscription(Subscriber<? super T> subscriber,
                ExecutorService executor) {
            this.subscriber = subscriber;
            this.executor = executor;
        }

        public synchronized void request(long n) {
            if (n != 0 && !completed) {
                if (n < 0) {
                    IllegalArgumentException ex = new IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                } else {
                    // just extending the demand
                    demand += n;
                }
            }
        }

        public synchronized void cancel() {
            completed = true;
        }

        Future<?> offer(T item) {
            return executor.submit(() -> {
                try {
                    subscriber.onNext(item);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            });
        }
    }
}

Публикует элемент первому подписчику, который еще не завершен (например, отменен) и имеет ненулевой спрос.

Обратите внимание, что этот код является просто наброском для нечетических целей, чтобы продемонстрировать идею . Например, он, вероятно, должен содержать больше обработки исключений (например, обработку RejectedExecutionException).

...