Темы еще живы после подписки - PullRequest
0 голосов
/ 17 сентября 2018

В настоящее время мы мигрируем из RxJava2 в Project Reactor.Чтобы парализовать работу, мы создаем новый поток через Schedulers.newThread() для каждого параллельного HTTP-запроса.Мы не можем повторно использовать потоки, потому что SecurityContext Spring привязан к ThreadLocal.

Недавно мы столкнулись с ошибкой, заключающейся в том, что через некоторое время JVM сталкивается с исключениями OutOfMemory, поскольку созданные потоки не были уничтожены, что привело к тысячам припаркованных потоков, с RxJava потоки, которые мы опускали после успешного HTTP-запроса.

Для RxJava код будет следующим: не показывать активных дополнительных потоков при наличии точки останова в последней строке.

Observable<String> deferred = Observable.fromCallable(() -> "1")
            .subscribeOn(io.reactivex.schedulers.Schedulers.newThread());
Observable<String> deferred2 = Observable.fromCallable(() -> "2")
            .subscribeOn(io.reactivex.schedulers.Schedulers.newThread());
System.out.println("obs: " + deferred.blockingSingle());
System.out.println("obs2: " + deferred2.blockingSingle());

Однако для Project Reactor оба потока все еще живы после обоихstdouts.

Mono<String> mono1 = Mono.fromSupplier(() -> "1").subscribeOn(Schedulers.newSingle("single"));
Mono<String> mono2 = Mono.fromSupplier(() -> "1").subscribeOn(Schedulers.newSingle("single"));
System.out.println("Mono1: " + mono1.block());
System.out.println("Mono2: " + mono2.block());

Решением для этого было бы отключить планировщик вручную в onFinally:

 Scheduler newSingle = Schedulers.newSingle("single");
 Mono<String> doFinally = Mono.defer(() -> Mono.fromSupplier(() -> "1")).subscribeOn(newSingle).doFinally(s -> {
        newSingle.dispose();
 });

Однако действительно ли это необходимо или есть способустановить такое же поведение, как в RxJava2?

...