повторить операцию в горячем потоке? - PullRequest
0 голосов
/ 29 июня 2018
Flux.just(1, 2, 3)
        .doOnNext(__ -> System.out.println("producing number: " + __))
        .publish()
        .autoConnect()
        .doOnNext((Integer __) -> {
            System.out.println("throwing error.");
            throw new RuntimeException("aaa");
        })
        .retry(1, error -> true)
        .subscribe(number -> System.out.println("I will never be here..."),
                error -> System.out.println("will I be here? " + error),
                () -> System.out.println("completed!"));

Выход:

producing number: 1
throwing error.
producing number: 2
producing number: 3

Ожидаемый результат: (из моей логики - я уверен, что ошибаюсь)

producing number: 1
throwing error.
producing number: 2
throwing error.
producing number: 3

Почему результат отличается от ожидаемого?

1 Ответ

0 голосов
/ 30 июня 2018

Это немного сложно, но все сводится к следующему:

Вызов publish и autoConnect приводит к тому, что после подписки создается внутренняя подписка, которая сохраняется даже в случае отмены внешней подписки. Думайте об этом как о двух потоках, соединенных publish и autoConnect.

--inner stream--> publish/connect-> --outer stream--> subscriber

Независимо от того, что происходит во внешнем потоке, внутренний поток будет продолжать работать. Это предназначено, потому что у вас есть горячая наблюдаемая и вы можете иметь более одного подписчика. Если один отключается, то другие все еще хотят иметь значения из потока. Другими словами, оператор публикации сообщает источнику, что он может выдавать значения сколько угодно.

    --inner stream--> publish/connect-> --outer stream--> subscriber1
                                        --outer stream--> subscriber2

Вы можете проверить это поведение, удалив retry. Номера 1 и 2 все еще будут напечатаны. Если вы хотите, чтобы источник прекратил создавать значения, вы можете использовать refCount вместо autoConnect. refCount отменяет внутренний поток, если подписчиков больше нет.

Здесь начинается сложная часть: этот поток является синхронным, и потоки являются просто функциями . Это немного сложнее под капотом, но функция подписки внутри оператора повтора работает, пока не завершится внутренний поток. Только тогда он будет переподписываться.

Отличается асинхронными потоками, например создан с Flux.interval.

Flux.interval(Duration.ofSeconds(1))
    .doOnNext(__ -> System.out.println("producing number: " + __))
    .publish()
    .autoConnect()
    .doOnNext((Integer __) -> {
        System.out.println("throwing error.");
        throw new RuntimeException("aaa");
    })
    .retry(1)
    .subscribe()

Когда вы звоните .subscribe(), retry будет звонить по внутренней подписке на doOnNext и так далее, пока первый doOnNext не вызовет subscribe на Flux. Интервал Flux говорит: «Хорошо, я отправлю первое значение через одну секунду», и подписка установлена.

Через одну секунду выдается значение, выдается ошибка, оператор повторной попытки снова подписывается.

В вашем примере Flux уже начинает излучать значения во время вызова подписки, прежде чем подписка будет полностью установлена, так сказать.

...