Потяните реализацию в RxJava - PullRequest
0 голосов
/ 13 июня 2018

Я занимаюсь разработкой библиотеки тестирования, и я хотел бы реализовать реализацию по запросу с использованием RxJava

У меня есть поток (все доступные тесты) с несколькими подписчиками, подписанными (все тестирующие устройства), и мне бы хотелось, чтобыкогда каждый подписчик заканчивает обработку запроса элемента новым элементом в потоке, и все элементы потока должны обрабатываться только одним потребителем.

Мне интересно, может ли это быть реализовано с использованием горячей наблюдаемой ииспользуя методы противодавления, но я не совсем уверен в этом: (

Есть идеи?

1 Ответ

0 голосов
/ 14 июня 2018

Наконец, с помощью @akarnokd я нашел решение этой проблемы.Вот код:

        Subscriber sub1=new Subscriber<Integer>() {
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1);
            System.out.println("onSubscribe done");
        }

        @Override
        public void onNext(Integer t) {
            System.out.println("Sub 1 Processing: "+t);
            sleep(1000);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) { }

        @Override
        public void onComplete() { }
    };

    Subscriber sub2=new Subscriber<Integer>() {
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(1);
            System.out.println("onSubscribe done");
        }

        @Override
        public void onNext(Integer t) {
            System.out.println("Sub2 Processing: "+t);
            sleep(500);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) { }

        @Override
        public void onComplete() { }
    };

    // ***** Magic happens here!! *****
    DispatchWorkProcessor<Integer> dwp = DispatchWorkProcessor.create(Schedulers.io());
    Flowable.range(1, 20).subscribe(dwp);
    dwp.subscribe(sub1);
    dwp.subscribe(sub2);
    // ********************************

    sleep(Integer.MAX_VALUE);
}

Хитрость заключалась в использовании request(1) для реализации подхода извлечения и DispatchWorkProcessor для использования элементов один раз из потока.Несмотря на то, что скорость одного потребителя вдвое больше, чем на выходе, ожидается:

onSubscribe done
onSubscribe done
Sub 1 Processing: 1
Sub2 Processing: 2
Sub2 Processing: 3
Sub 1 Processing: 4
Sub2 Processing: 5
Sub2 Processing: 6
Sub 1 Processing: 7
Sub2 Processing: 8
Sub2 Processing: 9
Sub 1 Processing: 10
Sub2 Processing: 11
Sub2 Processing: 12
Sub 1 Processing: 13
Sub2 Processing: 14
Sub2 Processing: 15
Sub 1 Processing: 16
Sub2 Processing: 17
Sub2 Processing: 18
Sub 1 Processing: 19
Sub2 Processing: 20

Все элементы потока обрабатываются один раз, а Sub 2 обрабатывает двойное число элементов, что приятно!

...