RxJava2 Игнорировать элемент, если он в данный момент существует в потоке - PullRequest
0 голосов
/ 21 мая 2019

У меня есть случай, когда мне нужно сбросить / игнорировать элемент, если он в настоящее время представлен в реактивном потоке. Например:

fun ignoreDuplicatesExample() {

    val publishSubject: PublishSubject<Long> = PublishSubject.create()
    publishSubject.observeOn(Schedulers.single()).distinct().subscribe({
        Thread.sleep(1000)
        println("onNext: $it")
    }, {
        error("$it")
    })

    publishSubject.onNext(1)
    publishSubject.onNext(2)
    publishSubject.onNext(3)

    publishSubject.onNext(1) // should be ignored
    publishSubject.onNext(2) // should be ignored
    publishSubject.onNext(3) // should be ignored

    Thread.sleep(10_000)
    publishSubject.onNext(1) // by this time it should be already consumed, so it need to be allowed to emit it again
    publishSubject.onNext(4)

    Thread.sleep(10_000)
    println("exit")
}

Выход:

onNext: 1
onNext: 2
onNext: 3
onNext: 4
exit

Но я ожидал увидеть:

onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 4
exit

Итак, кто-нибудь знает, как этого добиться с помощью RxJava2?

1 Ответ

0 голосов
/ 23 мая 2019

Вы не можете сделать это прямо, потому что верхняя цепочка потоков не должна знать нисходящие потоки, если она «потребляет» d или нет.(Представьте, что подписчиков несколько.) Если вы хотите это сделать, вам нужно взаимодействовать с переменной, находящейся вне потока.

...