RX Java Последовательное выполнение наблюдаемого - PullRequest
0 голосов
/ 12 апреля 2020

У меня есть несколько функций, которые возвращают Observable<String>. Каждая функция выполняет команду в файловой системе. Мне нужно выполнить каждую функцию одну за другой и получить вывод функции в наблюдаемой. В конце я хочу один Observable<String>, который содержит выходные данные всех функций в порядке вызовов функций

По отдельности, каждая функция работает должным образом, но мне нужно правильно объединить вывод.

Я пробовал Observable.concatArray (func1, func2, ...) так:

    return Observable.concatArray(
        func1(),
        func2(),
        func3(), 
        func4()
    );

, но это просто сохраняет последовательность событий наблюдаемой. Не последовательность функций. Я имею в виду, если func1 испускает события A и A 'и func2 испускают B и B', у меня будет A-> A '-> B-> B'. Но func2 запустится сразу после func1. Это вызывает у меня проблемы с тем, что func1 необходимо завершить до того, как func2 может запуститься.

Первая функция создает каталог в файловой системе через maven. Итак, долгосрочное задание. Второй, записывает файл в этот каталог. Но concatArray запускает второй сразу после первого. И команда терпит неудачу, потому что каталог не существует в настоящее время.

Есть ли способ избежать чего-то уродливого, подобного этому:

Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1(); 
Observable<String> func2Obs = funct2(); 

func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
    func2Obs.subscribe(output -> result.onNext(output);
}
return result;

1 Ответ

1 голос
/ 12 апреля 2020

Как и Suggest Progman, ошибка не была в concatArray, это метод, который нужно использовать. Проблема заключалась в том, что в моем списке функций я использовал такой код:

public Observable<String> func1() {
    Subject<String> result = PublishSubject.create();
    String output = dosomething()
    result.onNext(output);
}

Проблема здесь в том, что функция doSomething () вызывается сразу при создании наблюдаемой.

Решение состоит в том, чтобы использовать либо Observable.create(), если вам нужно onNext, onComplete и т. Д. c ...:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.create( result -> {
        String output = dosomething()
        result.onNext(output);   
    });
}

или Observable.defer(), если вам просто нужно дождаться подписки:

public Observable<String> func1() {
    // See how we wrap our instruction inside create method
    return Observable.defer( () -> dosomething());
}

После этого вы можете позвонить:

return Observable.concatArray(
    func1(),
    func2(),
    func3(), 
    func4()
);
...