Rx Java подписка PublishSubject ничего не дает. Что-то не так с подпиской? - PullRequest
0 голосов
/ 09 июля 2020

Learning Rx Java, поэтому буду благодарен за любой совет.

Необходимо получить объекты, когда расчет будет готов, поэтому я сделал PublishSubject в методе моделей:

public PublishSubject<BaseUnit> exec(int inputNumber) {

    if (unitList.size() > 0) {
        for (BaseUnit unit : unitList) {
            unit.setInProgress();
        }
    }

    PublishSubject<BaseUnit> subject = PublishSubject.create();

    list = new ArrayList<>();

    populateList(inputNumber).
            subscribeOn(Schedulers.from(Executors.newFixedThreadPool(ThreadPool.getPoolSize())))
            .subscribe(calculatedList -> {
                list = calculatedList;

                for (List<Integer> elem : list) {
                    for (ListOperationName operationName : ListOperationName.values()) {
                        ListUnit unit = new ListUnit(operationName, elem, 0);
                        calculate(unit);
                        unitList.add(unit);
                        subject.onNext(unit);
                    }
                }

            }, error -> Log.d("ERROR", error.toString()));

    return subject;
}


public Observable<ArrayList<List<Integer>>> populateList(int inputNumber) {
    return Observable.fromCallable(() -> {

        ArrayList<List<Integer>> list = new ArrayList<>();

        Integer[] populatedArray = new Integer[inputNumber];
        Arrays.fill(populatedArray, insertValue);

        list.add(new ArrayList<>(Arrays.asList(populatedArray)));
        list.add(new LinkedList<>(Arrays.asList(populatedArray)));
        list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));

        return list;
    });
}

А потом пытаюсь подписаться в ведущем:

public void calculate(int inputNumber) {

    fragment.showAllProgressBars();

    repository.exec(inputNumber)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(ThreadPool.getPoolSize())))
            .subscribe(unit -> {
                Log.d("PRESENTER RESULT", unit.toString());
                fragment.setCellText(unit.getViewId(), unit.getTimeString());

            }, error -> Log.d("PRESENTER ERROR", error.toString()));
}

Это мне ничего не дает. Но если я использую ReplaySubject - он дает мне все результаты, но, похоже, использует только один поток. Поэтому я предполагаю, что с подпиской я сделал что-то не так, и это должно быть где-то раньше. Мне нужно использовать именно PublishSubject для получения результатов, поскольку они готовы с использованием нескольких потоков.

Как это исправить? А может есть другая проблема?

...