Порядок событий, получаемых подписчиками публикации sh Тема - PullRequest
0 голосов
/ 05 мая 2020

У меня есть тема publi sh с несколькими подписчиками:

Вот класс:

class Real {

    private val publisher: PublishSubject<String> = PublishSubject.create()

    fun doPublish() {
        for (i in 1 until 20) {
            publisher.onNext("$i Hello")
        }
        publisher.onComplete()
    }

    fun doSubscribe() {
        publisher.subscribe {
            println("Subscriber1 $it")
        }

        publisher.subscribe {
            println("Subscriber2 $it")
        }

        publisher.subscribe {
            println("Subscriber3 $it")
        }

    }
}

Я звоню doSubscribe() перед тем, как позвонить doPublish() Результат выглядит как следует:

 Task :Main.main()
Subscriber1 1 Hello
Subscriber2 1 Hello
Subscriber3 1 Hello
Subscriber1 2 Hello
Subscriber2 2 Hello
Subscriber3 2 Hello
Subscriber1 3 Hello
Subscriber2 3 Hello
Subscriber3 3 Hello
Subscriber1 4 Hello
Subscriber2 4 Hello
Subscriber3 4 Hello
Subscriber1 5 Hello
Subscriber2 5 Hello
Subscriber3 5 Hello
Subscriber1 6 Hello
Subscriber2 6 Hello
Subscriber3 6 Hello
Subscriber1 7 Hello
Subscriber2 7 Hello
Subscriber3 7 Hello
Subscriber1 8 Hello
Subscriber2 8 Hello
Subscriber3 8 Hello
Subscriber1 9 Hello
Subscriber2 9 Hello
Subscriber3 9 Hello
Subscriber1 10 Hello
Subscriber2 10 Hello
Subscriber3 10 Hello
Subscriber1 11 Hello
Subscriber2 11 Hello
Subscriber3 11 Hello
Subscriber1 12 Hello
Subscriber2 12 Hello
Subscriber3 12 Hello
Subscriber1 13 Hello
Subscriber2 13 Hello
Subscriber3 13 Hello
Subscriber1 14 Hello
Subscriber2 14 Hello
Subscriber3 14 Hello
Subscriber1 15 Hello
Subscriber2 15 Hello
Subscriber3 15 Hello
Subscriber1 16 Hello
Subscriber2 16 Hello
Subscriber3 16 Hello
Subscriber1 17 Hello
Subscriber2 17 Hello
Subscriber3 17 Hello
Subscriber1 18 Hello
Subscriber2 18 Hello
Subscriber3 18 Hello
Subscriber1 19 Hello
Subscriber2 19 Hello
Subscriber3 19 Hello

Согласно вышеуказанной программе первый подписчик получает событие первым, за которым следуют второе и третье, это точно в соответствии с порядком подписки.

Гарантирован ли такой порядок исполнения? Поскольку я не могу найти соответствующую документацию по этому поводу.

1 Ответ

0 голосов
/ 06 мая 2020

Обратите внимание на реализацию PublishSubject :

Что происходит при подписке?

PublishDisposable создается и добавляется к массиву подписчиков "через метод добавления (b [n] = ps;)

Теперь у PublishSubject есть массив подписчиков, который соблюдает порядок вставки

@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    if (add(ps)) {
        // if cancellation happened while a successful add, the remove() didn't work
        // so we need to do it again
        if (ps.isDisposed()) {
            remove(ps);
        }
    } else {
        ...
    }
}

boolean add(PublishDisposable<T> ps) {
    for (;;) {
        PublishDisposable<T>[] a = subscribers.get();
        if (a == TERMINATED) {
            return false;
        }

        int n = a.length;
        @SuppressWarnings("unchecked")
        PublishDisposable<T>[] b = new PublishDisposable[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = ps;

        if (subscribers.compareAndSet(a, b)) {
            return true;
        }
    }
}

Теперь источник испускает новое значение через onNext. Метод onNext показывает вызов onNext-вызова подписчиков. Массив подписчиков перебирается от 0 до n. Следовательно, подписчики вызываются в порядке вставки , потому что по контракту onNext должен вызываться последовательно.

Observables должны отправлять уведомления наблюдателям последовательно (не параллельно). Они могут отправлять эти уведомления из разных потоков, но между уведомлениями должна быть формальная связь «происходит до». (http://reactivex.io/documentation/contract.html)

@Override
public void onNext(T t) {
    ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
    for (PublishDisposable<T> pd : subscribers.get()) {
        pd.onNext(t);
    }
}
...