Вот простое приложение с весенней загрузкой:
@SpringBootApplication
@RestController
public class ReactiveApplication {
static Flux<String> fluxString;
static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();
private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
private static final long serialVersionUID = 1L;
@Override
public boolean add(String e) {
synchronized (this) {
notify();
}
return super.add(e);
}
@Override
public String poll() {
synchronized (this) {
if(isEmpty()) {
try {
wait();
} catch (InterruptedException ex) {}
}
}
return super.peek() == null ? "" : super.poll();
}
}
static Consumer<String> consumer = str -> queue.add(str);
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ReactiveApplication.class, args);
}
static {
for(int i = 0; i < 10; i++)
queue.add("testData " + i + " ");
}
@GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> home() {
Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
List<String> list = new ArrayList<>(queue);
queue.removeAll(queue);
fluxString = Flux.<String>create(sink -> {
sink.onRequest(n -> {
for(int i = 0; i < n; i++) {
sink.next(queue.poll());
}
}).onCancel(() -> sch.dispose());
}).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));
return fluxString;
}
@GetMapping("/add")
public String add( @RequestParam String s) {
consumer.accept(s);
return s;
}
}
Так что в основном это приложение создает поток String. Посещение /
захватит всю строку, представленную в строке, а затем объединит все, что добавлено из ресурса /add
(игнорируйте «Безопасные методы должны быть идемпотентными»).
То, что я считаю странным, - это когда Я перемещаю public static void main(...)
в строку 1, приложение начинает плохо себя вести, и добавление новых значений в /add
не имеет никакого эффекта. Я думаю, что должно быть что-то интересное, что делает приложение плохо себя ведет. Любое объяснение?