RxJava2 Как выполнить некоторые операции, когда завершено архивирование двух источников данных - PullRequest
0 голосов
/ 18 декабря 2018

Я обрабатываю два монофонических wav-файла и хочу сохранить их как стереофонический файл.Каждый моно транслируется и подписывается следующим образом.Моно данные могут иметь разную длину.Я использовал zip оператор для обработки пар.Проблема в том, что мне нужно позвонить:

fileWriter.stopRecord();

, когда обработка обеих данных завершена.Поэтому я хотел сделать это методом zip onCompleted, но он не работает, потому что после обработки одного моно (вызванного onCompleted) он вызывает zip onCompleted, а другой моно все еще обрабатывает.Итак, как я могу убедиться, что zip onCompleted вызывается после обработки обоих моно?Неважно, что одно значение пары может быть нулевым.

        Observable<HermesMessages.AudioChunk> speakerAudioObservable = this.channel.getSpeakerAudio();

        speakerAudioObservable.subscribe(
                sample -> {
                    log.trace("Processing speaker !!!" );
                },
                error -> {
                    log.error("Error during writing to file", error);
                }, () -> {
                    log.trace("Completed writing speaker audio to file.");
                });

        Observable<HermesMessages.AudioChunk> dronAudioObservable = channel.getDronAudio();
        dronAudioObservable.subscribe(
                sample -> {
                    log.trace("Processing dron !!!" );

                },
                error -> {
                    log.error("Error during writing to file", error);
                },
                () -> {
                    log.trace("Completed writing dron audio to file.");
                });

        Observable.zip(speakerAudioObservable, dronAudioObservable, Pair::create)
                .subscribe(
                        pair -> {
                            fileWriter.streamSecData(pair.first().getContent());
                            fileWriter.workerJob();

                            fileWriter.streamData(pair.second().getContent());
                            fileWriter.workerJob();
                        },
                        error -> {
                            log.error("zip Error during writing to file", error);
                            fileWriter.stopRecord();

                        }, () -> {
                            log.trace("zip Completed writing speaker audio to file.");
                            fileWriter.stopRecord();
                        });

Ответы [ 2 ]

0 голосов
/ 20 декабря 2018

Я провел простое тестирование решения с несколькими целыми числами. Надеюсь, что это решение поможет вам 2 потока целых чисел с разным размером

 public void run(String... args) throws Exception {
    Observable<Integer> integerObservable1 = Observable.just(1, 2, 3,4);
    Observable<Integer> integerObservable2 = Observable.just(1, 2);

    runObservable(integerObservable1, integerObservable2)
    .map(o -> {
        System.out.println(o);
        return o;
    }).subscribe();
}

private Observable<Object> runObservable(Observable<Integer> integerObservable1, Observable<Integer> integerObservable2) {
    return Observable.merge(
            Observable.zip(integerObservable1, integerObservable2, (BiFunction<Integer, Integer, Object>) (integer, integer2) -> integer + " " + integer2),

            Single.zip(integerObservable1.toList(), integerObservable2.toList(), (integers, integers2) -> Math.min(integers.size(), integers2.size()))
                    .flatMapObservable(integer -> Observable.merge(integerObservable1.skip(integer), integerObservable2.skip(integer)))
                    .map(integer -> integer + " " + "static value")
    );
}

вывод кода:

1 1

2 2

3 статическое значение

4 статическое значение

поскольку поток 1 имеет 2 элемента, а поток 2 имеет 4 элемента, вы оставили его статическим как нольне поддерживается в rxjava, но вы можете выбрать, может ли статическое значение быть нулевым или последним значением

0 голосов
/ 19 декабря 2018

Вы не можете сделать это с zip, потому что поток завершается, когда один из наблюдаемых zip заканчивает испускать элементы.(источник: http://reactivex.io/documentation/operators/zip.html).

Но вы можете использовать другой оператор CombineLatest. Он объединяет элемент, испускаемый наблюдаемой, с последним элементом, испускаемым другой наблюдаемой. Недостатком может быть то, что у вас естьчтобы проверить, был ли элемент уже "обработан" (уже создана пара из этого элемента). Поэтому вам нужно изменить метод Pair::create. Также нет необходимости подписываться на каждую наблюдаемую индивидуально.

Источник:http://reactivex.io/documentation/operators/combinelatest.html

...