Это немного сложно, но все сводится к следующему:
Вызов publish
и autoConnect
приводит к тому, что после подписки создается внутренняя подписка, которая сохраняется даже в случае отмены внешней подписки. Думайте об этом как о двух потоках, соединенных publish
и autoConnect
.
--inner stream--> publish/connect-> --outer stream--> subscriber
Независимо от того, что происходит во внешнем потоке, внутренний поток будет продолжать работать. Это предназначено, потому что у вас есть горячая наблюдаемая и вы можете иметь более одного подписчика. Если один отключается, то другие все еще хотят иметь значения из потока. Другими словами, оператор публикации сообщает источнику, что он может выдавать значения сколько угодно.
--inner stream--> publish/connect-> --outer stream--> subscriber1
--outer stream--> subscriber2
Вы можете проверить это поведение, удалив retry
. Номера 1 и 2 все еще будут напечатаны. Если вы хотите, чтобы источник прекратил создавать значения, вы можете использовать refCount
вместо autoConnect
. refCount
отменяет внутренний поток, если подписчиков больше нет.
Здесь начинается сложная часть: этот поток является синхронным, и потоки являются просто функциями . Это немного сложнее под капотом, но функция подписки внутри оператора повтора работает, пока не завершится внутренний поток. Только тогда он будет переподписываться.
Отличается асинхронными потоками, например создан с Flux.interval
.
Flux.interval(Duration.ofSeconds(1))
.doOnNext(__ -> System.out.println("producing number: " + __))
.publish()
.autoConnect()
.doOnNext((Integer __) -> {
System.out.println("throwing error.");
throw new RuntimeException("aaa");
})
.retry(1)
.subscribe()
Когда вы звоните .subscribe()
, retry
будет звонить по внутренней подписке на doOnNext
и так далее, пока первый doOnNext
не вызовет subscribe
на Flux. Интервал Flux говорит: «Хорошо, я отправлю первое значение через одну секунду», и подписка установлена.
Через одну секунду выдается значение, выдается ошибка, оператор повторной попытки снова подписывается.
В вашем примере Flux уже начинает излучать значения во время вызова подписки, прежде чем подписка будет полностью установлена, так сказать.