Весенний потребитель WebFlux потонет - PullRequest
0 голосов
/ 03 мая 2020

Вот простое приложение с весенней загрузкой:

@SpringBootApplication
@RestController
public class ReactiveApplication {

    static Flux<String> fluxString;
    static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();

    private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
        private static final long serialVersionUID = 1L;

        @Override
        public boolean add(String e) {
            synchronized (this) {
                notify();
            }
            return super.add(e);
        }

        @Override
        public String poll() {
            synchronized (this) {
                if(isEmpty()) {
                    try {
                        wait();
                    } catch (InterruptedException ex) {}
                }
            }
            return super.peek() == null ? "" : super.poll();
        }
    }

    static Consumer<String> consumer = str -> queue.add(str);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    static {
        for(int i = 0; i < 10; i++)
            queue.add("testData " + i + " ");
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
        List<String> list = new ArrayList<>(queue);
        queue.removeAll(queue);

        fluxString = Flux.<String>create(sink -> {
            sink.onRequest(n -> {
                for(int i = 0; i < n; i++) {
                    sink.next(queue.poll());
                }
            }).onCancel(() -> sch.dispose());
        }).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));

        return fluxString;

    }

    @GetMapping("/add")
    public String add( @RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}

Так что в основном это приложение создает поток String. Посещение / захватит всю строку, представленную в строке, а затем объединит все, что добавлено из ресурса /add (игнорируйте «Безопасные методы должны быть идемпотентными»).

То, что я считаю странным, - это когда Я перемещаю public static void main(...) в строку 1, приложение начинает плохо себя вести, и добавление новых значений в /add не имеет никакого эффекта. Я думаю, что должно быть что-то интересное, что делает приложение плохо себя ведет. Любое объяснение?

1 Ответ

0 голосов
/ 05 мая 2020

Я закончил тем, что использовал это, которое прекрасно работает:

@SpringBootApplication
@RestController
public class ReactiveApplication {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
    private static Consumer<String> consumer = str -> {
        try { queue.put(str); }
        catch (InterruptedException e) {}
    };
    static {
        for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        final Scheduler sch = Schedulers.newSingle("async-flux");

        return Flux.<String>generate(sink -> {
            try { sink.next(queue.take()); }
            catch (InterruptedException e) { }
        }).log().subscribeOn(sch);

    }

    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}
...