Как отметил @akarnokd в комментариях, созданный вами PublishSubject
немедленно испускает любое значение, переданное ему методом onNext
. Это происходит независимо от того, подписано ли на данный момент что-либо. Он предназначен в первую очередь для того, чтобы помочь преодолеть разрыв между императивным или основанным на обратном вызове кодом и реактивным кодом.
То, что вам нужно, - это Observable
, который начинает выполнять некоторый синхронный код, когда на него что-то подписывается. Observable.create
- это один из способов создания такого экземпляра, но его использование может быть громоздким для правильного использования.
Более удобный способ создания того, что вам нужно, - Observable.fromPublisher
. Он принимает Publisher
в качестве аргумента. Publisher
сама является функцией, которой передается экземпляр Subscriber
всякий раз, когда Observer
подписывается на Observable
, созданный fromPublisher
, и позволяет вам отправлять события непосредственно этому Observer
.
Код, который вы хотите, будет выглядеть примерно так:
fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
return Observable.fromPublisher { subscriber ->
for (x in 1..1_000) {
val fakeReport = genFakeReport()
subscriber.onNext(fakeReport)
Thread.sleep(1)
}
subscriber.onComplete()
}
}
fun main() {
/** supply whatever logic you want to generate a fake [Report] */
fun genFakeReport(): Report = TODO()
val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}
Это будет корректно выдавать значения после подписки на экземпляр Observable
, возвращаемый generateReportStream
. Кроме того, на один и тот же экземпляр можно сделать больше подписок, и каждая из них будет выдавать значения последовательности fre sh, используя одни и те же логики c.