Получение перестановки для реактивных потоков - PullRequest
0 голосов
/ 07 февраля 2019

У меня есть два реактивных потока, которые должны работать асинхронно.В результате я должен вызвать функцию foo(), которая будет принимать два аргумента и вызываться со всеми возможными перестановками элементов из двух исходных реактивных потоков.И это должно провалиться все, если какое-либо исключение произошло в течение всего процесса.Каков наилучший способ реализовать это с использованием активной зоны реактора?

Пример:

    String[] aInitial = {"a","b","c"};
    String[] bInitial = {"0","1"};

    Flux<String> fluxA = Flux.fromArray(aInitial);
    Flux<String> fluxB = Flux.fromArray(bInitial);

    ...

    private void foo(String a, String b){
        System.out.println(a + ", " + b); 
    }

Ожидаемый результат (порядок не имеет значения):

a, 0 a, 1 b, 0 b, 1 c,0 с, 1

1 Ответ

0 голосов
/ 07 февраля 2019

Вместо того, чтобы вызывать действие (в вашем случае System.out.println) в методе, выполняющем преобразование, разделите его на одну функцию, которая объединяет элементы из обоих, и другую, которая воздействует на эти данные.

public static void main(String[] args) {

    String[] aInitial = {"a", "b", "c"};
    String[] bInitial = {"0", "1"};

    Flux<String> fluxA = Flux.fromArray(aInitial);
    Flux<String> fluxB = Flux.fromArray(bInitial);

    fluxA
            .flatMap(input1 -> fluxB.map(input2 -> foo(input1, input2)))
            .doOnNext(System.out::println)
            .blockLast();
}

private static String foo(String a, String b) {
    return a + ", " + b;
}

Как вы можете видеть, мы перебираем первый поток и используем второй поток для создания вложенного цикла.

Обратите внимание, что это не будет работать для горячих потоков, здесь будут работать только холодные потокинам нужно повторно подписаться на второй поток для каждого элемента, который излучает первый.

...