RXjava непрерывные потоки данных - PullRequest
0 голосов
/ 09 мая 2018

Как правильно в RxJava подписываться на наблюдаемые, которые постоянно получают события из разных источников в неизвестное время.

Например: скажем, у нас есть задачи, которые получены от сервера, и вызовы к серверу могут быть начаты из ряда различных областей (например, вызванные push-уведомлением, событием опроса, взаимодействием с пользователем и т. Д.).

Единственное, что нас волнует в пользовательском интерфейсе, это то, что мы уведомлены о задачах, которые были получены. Нам все равно, откуда они берутся. Я действительно хочу начать наблюдение за деятельностью, и модель обновляет наблюдателя по мере необходимости

Я реализовал приведенный ниже класс, который делает то, что я хочу, но я не уверен, что это правильно или RxJava уже отвечает за что-то подобное.

Этот класс эффективно создает в ConnectableObservable, в котором много наблюдателей могут подписаться на один Observable (убедитесь, что все наблюдатели получают один и тот же поток). Одна вещь, на которую я обратил внимание, это то, что при вызове наблюдений и подписки может возникнуть неожиданный результат при подписке на ConnectableObservable таким образом, что может быть проблемой, так как класс не может контролировать, кто использует ConnectableObservable.

public class ApiService {

private Emitter<String> myEmitter;
private ConnectableObservable myObservable;

public ApiService() {
    //Create an observable that is simply used to get the emitter.
    myObservable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            myEmitter = e;
        }
    }).publish();
    //connect must be called here to ensure we have an instance of the emitter
    // before we have any subscribers
    myObservable.connect();

}

/**
 * This method returns the observable that all observers will subscribe to
 *
 * @return
 */
public Observable<String> getObservable() {
    return myObservable;
}

/**
 * This method is used to simulate a value that has been received from
 * an unknown source
 *
 * @param value
 */
public void run(final String value) {
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            myEmitter.onNext(value);
            //api call
        }
    }).subscribe();
}

}

Мне также любопытно, есть ли какие-либо проблемы с утечками памяти, делающими это таким образом, предполагая, что каждый наблюдатель, подписавшийся на одного наблюдателя, удаляется в соответствующее время.

Это класс с рефакторингом после просмотра ответа Боба

public class ApiService {

private PublishProcessor<String> myProcessor = PublishProcessor.create();

public void subscribe(Subscriber<String> subscriber) {
    myProcessor.subscribe(subscriber);
}

/**
 * This method is used to simulate a value that has been received from
 * an unknown source
 *
 * @param value
 */
public void run(final String value) {
    myProcessor.onNext(value);
}

} * * тысяча двадцать-один

1 Ответ

0 голосов
/ 09 мая 2018

Используйте Subject (RxJava) или Processor (RxJava 2) для подписки. Затем вы должны подписать тему для каждого источника наблюдения. В конце концов, вы подпишетесь на эту тему и получите общий поток выбросов.

В качестве альтернативы, вы можете использовать Relay, чтобы изолировать наблюдателей, расположенных ниже по течению, от любых onComplete() или onError(), которые могут исходить от восходящего потока. Это лучший выбор, когда любая из наблюдаемых может завершиться раньше, чем другие.

...