Learning Rx Java, поэтому буду благодарен за любой совет.
Необходимо получить объекты, когда расчет будет готов, поэтому я сделал PublishSubject
в методе моделей:
public PublishSubject<BaseUnit> exec(int inputNumber) {
if (unitList.size() > 0) {
for (BaseUnit unit : unitList) {
unit.setInProgress();
}
}
PublishSubject<BaseUnit> subject = PublishSubject.create();
list = new ArrayList<>();
populateList(inputNumber).
subscribeOn(Schedulers.from(Executors.newFixedThreadPool(ThreadPool.getPoolSize())))
.subscribe(calculatedList -> {
list = calculatedList;
for (List<Integer> elem : list) {
for (ListOperationName operationName : ListOperationName.values()) {
ListUnit unit = new ListUnit(operationName, elem, 0);
calculate(unit);
unitList.add(unit);
subject.onNext(unit);
}
}
}, error -> Log.d("ERROR", error.toString()));
return subject;
}
public Observable<ArrayList<List<Integer>>> populateList(int inputNumber) {
return Observable.fromCallable(() -> {
ArrayList<List<Integer>> list = new ArrayList<>();
Integer[] populatedArray = new Integer[inputNumber];
Arrays.fill(populatedArray, insertValue);
list.add(new ArrayList<>(Arrays.asList(populatedArray)));
list.add(new LinkedList<>(Arrays.asList(populatedArray)));
list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));
return list;
});
}
А потом пытаюсь подписаться в ведущем:
public void calculate(int inputNumber) {
fragment.showAllProgressBars();
repository.exec(inputNumber)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(ThreadPool.getPoolSize())))
.subscribe(unit -> {
Log.d("PRESENTER RESULT", unit.toString());
fragment.setCellText(unit.getViewId(), unit.getTimeString());
}, error -> Log.d("PRESENTER ERROR", error.toString()));
}
Это мне ничего не дает. Но если я использую ReplaySubject
- он дает мне все результаты, но, похоже, использует только один поток. Поэтому я предполагаю, что с подпиской я сделал что-то не так, и это должно быть где-то раньше. Мне нужно использовать именно PublishSubject
для получения результатов, поскольку они готовы с использованием нескольких потоков.
Как это исправить? А может есть другая проблема?