RxJava2.1.0: PublishSubject onNext не вызывается при подписке на разные потоки - PullRequest
0 голосов
/ 20 февраля 2019

Обнаружено, что при подписке на сериализованный объект PublishSubject в течение 10-20 мс после вызова событий onNext для субъектов;onNext нового подписчика не вызывается.

В приведенном ниже фрагменте кода;значение для наблюдения [1] задается как «2000», и subseToSubject () вызывается после вызова onNext () для субъекта со значением 1998 [2];мы видим, что, если интервал составляет 10 мс, новые подписчики будут пропускать значение 2000, выдаваемое субъектом;Принимая во внимание, что, если интервал составляет 50 мс или больше, новые подписчики, кажется, получают ожидаемые значения;Это ожидаемое поведение?

Такое поведение наблюдается в RxJava 2.1.0;Кажется, что-то вроде состояния гонки

public class PublishSubjectTest {

    private final Subject<String> singlePropertyUpdateSubject =
            PublishSubject.<String>create().toSerialized();


    public static void main(String[] args) {
        PublishSubjectTest obj= new PublishSubjectTest();
        obj.sendEvents();
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


//[1]
    private final String valueToObserve = "2000";
    private void subscribeToSubject() {
        System.out.println("Subscribing .....");
        io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
                value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println("  Value Received   "+observedValue +" By "+Thread.currentThread() ))
        );



    }

    private io.reactivex.Observable<String> getAndObserve(String value) {
        final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
                //.doOnNext(v-> System.out.println("Received value "+v))
                .filter(v -> v.equals(value))
                .doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
        return observable;
    }


// 50ms >= expected result ;  Anything less than 10ms will fail.
    private void sendEvents() {
        io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> {
            String key = value.toString();
            //System.out.println("Adding key "+key);
            singlePropertyUpdateSubject.onNext(key);
//[2]           
 if (value == 1998){
                subscribeToSubject();;
            }
            if (value%100==0) {
                System.out.println(value);
            }

        });
    }

1 Ответ

0 голосов
/ 20 февраля 2019

Хорошо, проблема в PublishSubject;использование ReplaySubject, кажется, решает эту проблему одновременной подписки;также есть проблема с тестовым кодом, вызывающим system.exit (1) при использовании ReplaySubject;Подробное обсуждение предоставляется github.com/ReactiveX/RxJava/issues/6414 -

. Пожалуйста, рассмотрите этот вопрос как закрытый.

...