Как правильно в RxJava подписываться на наблюдаемые, которые постоянно получают события из разных источников в неизвестное время.
Например: скажем, у нас есть задачи, которые получены от сервера, и вызовы к серверу могут быть начаты из ряда различных областей (например, вызванные push-уведомлением, событием опроса, взаимодействием с пользователем и т. Д.).
Единственное, что нас волнует в пользовательском интерфейсе, это то, что мы уведомлены о задачах, которые были получены. Нам все равно, откуда они берутся. Я действительно хочу начать наблюдение за деятельностью, и модель обновляет наблюдателя по мере необходимости
Я реализовал приведенный ниже класс, который делает то, что я хочу, но я не уверен, что это правильно или RxJava уже отвечает за что-то подобное.
Этот класс эффективно создает в ConnectableObservable, в котором много наблюдателей могут подписаться на один Observable (убедитесь, что все наблюдатели получают один и тот же поток). Одна вещь, на которую я обратил внимание, это то, что при вызове наблюдений и подписки может возникнуть неожиданный результат при подписке на ConnectableObservable таким образом, что может быть проблемой, так как класс не может контролировать, кто использует ConnectableObservable.
public class ApiService {
private Emitter<String> myEmitter;
private ConnectableObservable myObservable;
public ApiService() {
//Create an observable that is simply used to get the emitter.
myObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
myEmitter = e;
}
}).publish();
//connect must be called here to ensure we have an instance of the emitter
// before we have any subscribers
myObservable.connect();
}
/**
* This method returns the observable that all observers will subscribe to
*
* @return
*/
public Observable<String> getObservable() {
return myObservable;
}
/**
* This method is used to simulate a value that has been received from
* an unknown source
*
* @param value
*/
public void run(final String value) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
myEmitter.onNext(value);
//api call
}
}).subscribe();
}
}
Мне также любопытно, есть ли какие-либо проблемы с утечками памяти, делающими это таким образом, предполагая, что каждый наблюдатель, подписавшийся на одного наблюдателя, удаляется в соответствующее время.
Это класс с рефакторингом после просмотра ответа Боба
public class ApiService {
private PublishProcessor<String> myProcessor = PublishProcessor.create();
public void subscribe(Subscriber<String> subscriber) {
myProcessor.subscribe(subscriber);
}
/**
* This method is used to simulate a value that has been received from
* an unknown source
*
* @param value
*/
public void run(final String value) {
myProcessor.onNext(value);
}
} * * тысяча двадцать-один