Слияние источников горячего потока - PullRequest
0 голосов
/ 24 марта 2019

В Spring Boot 2 с Reactor я пытаюсь объединить два Flux горячих источника.Тем не менее, merge только когда-либо сообщает о первом из двух Flux параметров в merge.Как мне заставить merge распознать вторую Flux.

В приведенном ниже примере System.err в B-2 даже не печатается, когда outgoing1a является первым параметром.Если я сделаю outgoing2 первым, то A-2 не будет печататься.

Ниже приведен полный пример;

package com.example.demo;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class Weather {
String city;
Integer temperature;

public Weather(String city, Integer temperature) {
    this.city = city;
    this.temperature = temperature;
}

@Override
public String toString() {
    return "Weather [city=" + city + ", temperature=" + temperature + "]";
}

public static void main(String[] args) {

    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
    BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();

    // Assume Spring @Repository "A-1"
    new Thread(() -> {
        for (int d = 1; d < 1000; d += 1) {
            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
                queue.add(new Weather(s, d));
                try { Thread.sleep(250); } catch (InterruptedException e) {}
            }
        }
    }).start(); 

    // Assume Spring @Repository "B-1"
    new Thread(() -> {
        for (int d = 1; d < 1000; d += 1) {
            for (String s: new String[] {"MOS", "TLV"}) {
                queue2.add(new Weather(s, d));
                try { Thread.sleep(1000); } catch (InterruptedException e) {}
            }
        }
    }).start();

    // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
    Flux<Weather> outgoing1 = Flux.<Weather>create(
        sink -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    sink.next(queue.take());
                    System.err.println("1 " + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            sink.complete();
        }
    ).publishOn(Schedulers.newSingle("outgoing-1"));

    // Assume Spring @Service "B-2" = real-time MOS, TLV
    Flux<Weather> outgoing2 = Flux.<Weather>create(
            sink -> {
                for (int i = 0; i < 1000; i++) {
                    try {
                        sink.next(queue2.take());
                        System.err.println("2 " + queue2.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                sink.complete();
            }
        ).publishOn(Schedulers.newSingle("outgoing-2"));

    // Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
    Flux<Weather> outgoing1a = Flux.from(outgoing1)   
        .groupBy(c -> c.city)
        .flatMap(g -> g
            .sample(Duration.ofSeconds(5))
        )
        .log("C");

    // Assume Spring @Service "C" - merges "A-3" and "B-2"
    // only prints outgoing1a
    Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println); 

    // only prints outgoing2
    //Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println); 

}
}

1 Ответ

3 голосов
/ 25 марта 2019

Здесь есть несколько вещей.

  1. Обратите внимание на следующую рекомендацию оператора .merge ...

Обратите внимание, что слияние предназначено для работы с асинхронными или конечными источниками.При работе с бесконечным источником, который еще не публикуется в выделенном планировщике, вы должны изолировать этот источник в своем собственном планировщике, так как в противном случае объединение будет пытаться истощить его перед подпиской на другой источник.

Ваши исходящие потоки используют .publishOn, но это касается только операторов, связанных после оператора .publishOn.то есть это не влияет ни на что до .publishOn.В частности, это не влияет на поток, в котором выполняется код в lambda, переданный в Flux.create.Это можно увидеть, если добавить .log() до .publishOn в каждом из исходящих потоков.

Ваша лямбда, переданная в Flux.create, вызывает блокировкуметод (queue.take).

Поскольку вы вызываете subscribe(...) в объединенном потоке в потоке main, ваша лямбда, переданная в Flux.create, выполняется в потоке mainи блокирует это.

Самое простое решение - использовать .subscribeOn вместо .publishOn, чтобы ваш код в лямбда-выражении, переданный в Flux.create, работал в другом потоке (отличном от main).Это предотвратит блокировку потока main и позволит чередовать объединенный вывод из обоих исходящих потоков.

...