Потоковый кеш RxJava для однократной блокировки - PullRequest
1 голос
/ 19 октября 2019

Ниже приведен мой фрагмент кода.

Я знаю, что вы не должны блокировать cachedFlowable, как это, но это всего лишь пример.

Он застрял на blockingGetline.

Если я заменим singleOrError на singleElement, код все равно застрянет. Если я заменю singleOrError на firstElement, код больше не будет зависать.

Может кто-нибудь объяснить мне, почему это так?

    public static void main(String[] args) {
        final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
        cachedFlowable
                .doOnNext(i -> {
                    System.out.println("doOnNext " + i);
                    final Integer j = cachedFlowable.singleOrError().blockingGet();
                    System.out.println("after blockingGet " + j);
                })
                .blockingSubscribe();
    }

1 Ответ

2 голосов
/ 19 октября 2019

Причина, по которой он блокируется оператором singleX, заключается в том, что такие операторы ожидают возможного выброса 2-го элемента, но, поскольку вы блокируете их, любой второй элемент или завершение из основного источника не может быть выполнено. С firstX они заботятся только о самом первом элементе, поэтому разблокируют практически сразу, что позволяет завершить работу источника.

Так что да, вы не должны использовать методы блокировки в таких потоках, а вместо этого использовать flatMap или concatMap для каждого подпотока элемента:

var cache = Flowable.just(1).cache();

cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...