Я хотел бы использовать PublishProcessor, который будет работать так:
- подается с некоторыми данными инициализации (случайным образом, когда пользователь прокручивает представление - в этом примере целое число),
- выполняет некоторую фоновую работу (загружает данные на основе данных инициализации)
- при завершении загрузки уведомляет о вновь загруженном типе данных (в примере String)
Он должен быть все время готов к приему и обработке новых данных инициализации.
Я подготовил простой код для тестирования, но он не работает.
PublishProcessor processor = PublishProcessor.create();
processor.map(new Function<Integer, String>() {
@Override
public String apply(Integer o) throws Exception {
Thread.sleep(1000);
DevLog.d("test","executing "+o);
return String.valueOf(o)+"aaa";
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
Subscription sub;
@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
DevLog.d("test","onsubscribe "+s);
sub = s;
sub.request(1);
}
@Override
public void onNext(String s) {
DevLog.d("test","next "+s);
sub.request(1);
}
@Override
public void onError(Throwable t) {
DevLog.d("test","error "+t);
}
@Override
public void onComplete() {
DevLog.d("test","complete");
}
});
processor.onNext(666);
processor.onNext(123);
processor.onNext(456);
DevLog.d("test","nextsent");
Все, что я получаю в logcat, это:
Я бы предпочел:
- nextsent
- выполнение 666
- выполнение 123
- выполнение 456
- следующий 666aaa
- следующий 123aaa
- следующий 456aaa