В настоящее время мы мигрируем из RxJava2 в Project Reactor.Чтобы парализовать работу, мы создаем новый поток через Schedulers.newThread()
для каждого параллельного HTTP-запроса.Мы не можем повторно использовать потоки, потому что SecurityContext Spring привязан к ThreadLocal
.
Недавно мы столкнулись с ошибкой, заключающейся в том, что через некоторое время JVM сталкивается с исключениями OutOfMemory, поскольку созданные потоки не были уничтожены, что привело к тысячам припаркованных потоков, с RxJava потоки, которые мы опускали после успешного HTTP-запроса.
Для RxJava код будет следующим: не показывать активных дополнительных потоков при наличии точки останова в последней строке.
Observable<String> deferred = Observable.fromCallable(() -> "1")
.subscribeOn(io.reactivex.schedulers.Schedulers.newThread());
Observable<String> deferred2 = Observable.fromCallable(() -> "2")
.subscribeOn(io.reactivex.schedulers.Schedulers.newThread());
System.out.println("obs: " + deferred.blockingSingle());
System.out.println("obs2: " + deferred2.blockingSingle());
Однако для Project Reactor оба потока все еще живы после обоихstdouts.
Mono<String> mono1 = Mono.fromSupplier(() -> "1").subscribeOn(Schedulers.newSingle("single"));
Mono<String> mono2 = Mono.fromSupplier(() -> "1").subscribeOn(Schedulers.newSingle("single"));
System.out.println("Mono1: " + mono1.block());
System.out.println("Mono2: " + mono2.block());
Решением для этого было бы отключить планировщик вручную в onFinally
:
Scheduler newSingle = Schedulers.newSingle("single");
Mono<String> doFinally = Mono.defer(() -> Mono.fromSupplier(() -> "1")).subscribeOn(newSingle).doFinally(s -> {
newSingle.dispose();
});
Однако действительно ли это необходимо или есть способустановить такое же поведение, как в RxJava2?