Есть ли в RxJava2 эквивалент doAfterSubscribe? - PullRequest
0 голосов
/ 04 апреля 2020

У меня есть наблюдаемое, которое передает ответы на все запросы. Я хочу создать фильтр этого наблюдаемого при выполнении запроса, чтобы я мог поделиться результатами с несколькими подписчиками. Ниже приведен пример кода.

    PublishSubject<String> publishSubject = PublishSubject.create();
    Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
            .doOnSubscribe(disposable -> {
                publishSubject.onNext("foobar");
            })
            .replay(1)
            .refCount();

    fooObservable.subscribe(val -> log.info("A val : <{}>", val));

Я использую PublishSubject в качестве фиктивного сервиса, потому что иногда сервис немедленно возвращает ответ.

Я обнаружил, что, потому что там нет текущей подписки, когда есть немедленный результат, мой fooObservable не заполняется. т.е. я не получаю вывод журнала, когда хочу увидеть:

A val : <foobar>

Обратите внимание, что я получаю тот же результат с этим кодом:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
        .replay(1)
        .refCount();

publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));

Так что проблема в том, что fooObservable не подписаться на PublishSubject до тех пор, пока он не будет подписан,

Есть ли способ запустить код сразу после первой подписки на fooObservable?

Редактировать: Я думал о чем-то вроде:

PublishSubject<String> publishSubject = PublishSubject.create();
BehaviorSubject<String> fooObservable = BehaviorSubject.create();
publishSubject.filter(value -> value.startsWith("foo")).subscribe(fooObservable);
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));

Но тогда у меня есть 2 подписки, и я не уверен, как выполнить очистку, так как подписка после того, как фильтр не возвращает одноразовые.

Редактировать 2: Описание фоновое задание.

У меня есть сторонняя служба, на которую мой код должен подписаться. Этот сервис вызывает метод onResponse в моем коде с параметром, содержащим мой исходный запрос и ответ. Ответ может быть обновлен новым вызовом onResponse в любое время.

Я хочу обернуть, чтобы создать оболочку для этой службы, которая предоставляет метод:

public Observable<Response> getObservable(Request req);

Если запрос соответствует тот, который уже подписан, тогда наблюдаемое должно предоставить самое последнее совпадающее значение сразу при подписке.

Когда нет подписчиков, мне нужно отписаться от службы, которую я упаковываю.

Ответы [ 2 ]

0 голосов
/ 05 апреля 2020

Я думаю, что последнее редактирование вопроса помогает объяснить, что вы пытаетесь сделать, и эта ссылка на GitHub, вероятно, содержит ответ, который вы ищете: https://github.com/ReactiveX/RxJava/issues/4675

Но Я все еще поделюсь своим тестовым кодом. Я закончил тем, что следовал совету по ссылке выше и использовал PublishSubject с .replay(1).refCount().

Допустим, у нас есть сторонний сервисный интерфейс:

interface ThirdPartyService
{
    void subscribe( Consumer<Integer> responseConsumer );

    void unsubscribe();
}

Далее давайте создадим фиктивная реализация, которая будет вызывать потребителя со случайным int до тех пор, пока не отменит подписку:

    // Mock service to emit a random integer once per second, no Rx:
    ThirdPartyService mockService = new ThirdPartyService() {

        Timer timer = new Timer();

        @Override
        public void subscribe( Consumer<Integer> responseConsumer )
        {
            System.out.println( "Subscribe" );
            Random random = new Random();

            TimerTask task = new TimerTask() {
                @Override
                public void run()
                {
                    int i = random.nextInt( 10 );
                    System.out.println( "Producing: " + i );
                    responseConsumer.accept( i );
                }
            };

            timer.schedule( task, 1000, 1000 );
        }

        @Override
        public void unsubscribe()
        {
            System.out.println( "Unsubscribe" );
            timer.cancel();
        }
    };

Далее, фактический конвейер Rx. Допустим, я хочу отфильтровать только нечетные целые числа и отписаться от сервиса, когда нет наблюдателей:

    // Wrap service in a PublishSubject:
    PublishSubject<Integer> subject = PublishSubject.create();
    mockService.subscribe( subject::onNext );

    // Create observable:
    Observable<Integer> observable = subject
            .doFinally( mockService::unsubscribe )
            .filter( i -> i % 2 == 1 )  // Include only odd integers
            .replay( 1 )                // Replay latest to new observers
            .refCount();

Наконец, ручной тест:

    // Subscribe to Observable:
    Disposable sub1 = observable.subscribe( i -> System.out.println( "sub1 got: " + i ));

    // Sleep:
    Thread.sleep( 3300 );

    // Create 2nd Subscriber:
    System.out.println( "adding sub2" );
    Disposable sub2 = observable.subscribe( i -> System.out.println( "sub2 got: " + i ));

    // Sleep:
    Thread.sleep( 3300 );

    // Dispose 2nd Subscriber:
    System.out.println( "disposing sub2" );
    sub2.dispose();

    // Sleep:
    Thread.sleep( 3300 );

    // Dispose 1st Subscriber:
    sub1.dispose();

    // Sleep:
    Thread.sleep( 3300 );

Вывод:

Subscribe
Producing: 1
sub1 got: 1
Producing: 8
Producing: 6
adding sub2
sub2 got: 1
Producing: 3
sub1 got: 3
sub2 got: 3
Producing: 7
sub1 got: 7
sub2 got: 7
Producing: 1
sub1 got: 1
sub2 got: 1
disposing sub2
Producing: 6
Producing: 7
sub1 got: 7
Producing: 1
sub1 got: 1
Unsubscribe
0 голосов
/ 04 апреля 2020

Я думаю publish() и connect() - это то, что вам нужно. Здесь вы можете прочитать больше об этом.

В вашем случае это будет что-то вроде:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject
    .filter(value -> value.startsWith("foo"))
    .publish();

fooObservable.subscribe(val -> log.info("A val : <{}>", val));
Observable<Object> o2 = fooObservable.map { new Object() }
Observable<Object> o3 = fooObservable.map { /* Something here*/ }

Disposable disposable = fooObservable.connect()

Но не забудьте сделать disposable.dispose(), чтобы не было утечки

РЕДАКТИРОВАТЬ

BehaviorSubject<String> publishSubject = BehaviorSubject.create();
Observable<String> fooObservable = publishSubject.filter(value ->         
value.startsWith("foo"));
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
publishSubject.onNext("foobar");
...