Вопрос новичка по Rx Java и PublishSubject - PullRequest
0 голосов
/ 31 марта 2020

У меня есть вопрос относительно PublishSubject в RxJava. Я создал фиктивный объект PublishSubject, который испускает некоторые объекты. Вот мой код:

override fun generate(exportRequest: ExportRequest): Observable<Report> {
        val faker = Faker()
        val dummyPublisher = PublishSubject.create<Report>()
        for(x in 1..1_000){
            val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
            val report = Report(dataToExport)
            sddPublisher.onNext(report)
            Thread.sleep(1)
        }
        dummyPublisher.onComplete()
        return dummyPublisher
    }

При подписке никакие объекты не выдаются. Например, для этого ничего не печатается:

... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
     println(report)
 }

Может быть, я что-то упустил. Любая помощь будет оценена

1 Ответ

0 голосов
/ 29 апреля 2020

Как отметил @akarnokd в комментариях, созданный вами PublishSubject немедленно испускает любое значение, переданное ему методом onNext. Это происходит независимо от того, подписано ли на данный момент что-либо. Он предназначен в первую очередь для того, чтобы помочь преодолеть разрыв между императивным или основанным на обратном вызове кодом и реактивным кодом.

То, что вам нужно, - это Observable, который начинает выполнять некоторый синхронный код, когда на него что-то подписывается. Observable.create - это один из способов создания такого экземпляра, но его использование может быть громоздким для правильного использования.

Более удобный способ создания того, что вам нужно, - Observable.fromPublisher. Он принимает Publisher в качестве аргумента. Publisher сама является функцией, которой передается экземпляр Subscriber всякий раз, когда Observer подписывается на Observable, созданный fromPublisher, и позволяет вам отправлять события непосредственно этому Observer.

Код, который вы хотите, будет выглядеть примерно так:

fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
    return Observable.fromPublisher { subscriber ->
        for (x in 1..1_000) {
            val fakeReport = genFakeReport()
            subscriber.onNext(fakeReport)
            Thread.sleep(1)
        }
        subscriber.onComplete()
    }
}

fun main() {
    /** supply whatever logic you want to generate a fake [Report] */
    fun genFakeReport(): Report = TODO()
    val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}

Это будет корректно выдавать значения после подписки на экземпляр Observable, возвращаемый generateReportStream. Кроме того, на один и тот же экземпляр можно сделать больше подписок, и каждая из них будет выдавать значения последовательности fre sh, используя одни и те же логики c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...