Rxjava2 OutOfMemoryError - PullRequest
       8

Rxjava2 OutOfMemoryError

0 голосов
/ 11 декабря 2018

Я получаю ошибку OutOfMemory очень часто.

Образец кода:

Flowable.fromIterable(req.getAllClaims()).filter(Objects::nonNull)
            .flatMap(data -> {
                ClaimStatusCollector collector = statsCollector.get(data.getName());
                collector.setVehicleHelper(vehicleHelper);
                collector.setMedicalHelper(medicalHelper);
                return Flowable.fromCallable(() -> collector.verifyStatus(data)
                        );
            }, 5)
            .blockingIterable().forEach(data -> {
                claims.add(data.blockingFirst());
            });

VehicleClaimStatusCollector.java 
@Override
public Flowable<Claim> verifyStatus(Claim claim)
{
    return Flowable.create(emitter -> {
        try
        {
            //external http call
            emitter.onNext(claim);
        }
        catch (Exception e)
        {
            emitter.onNext(claim);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);

}

MedicalClaimStatusCollector.java 
@Override
public Flowable<Claim> verifyStatus(Claim claim)
{
    return Flowable.create(emitter -> {
        try
        {
            //external http call
            emitter.onNext(claim);
        }
        catch (Exception e)
        {
            emitter.onNext(claim);
        }
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);

}

RxCachedThreadScheduler-6034 "# 6994 демон prio = 5 os_prio = 31 tid = 0x00007ffcab9ce800 nid = 0x42d07 runnable [0x0000700ava.glan.dlanСостояние: RUNNABLE на java.net.SocketInputStream.socketRead0 (собственный метод)

Совершенно ясно, что мы получаем исключение для сокета из внешнего http-запроса. Я также установил readtimeout в клиенте jersey. Все потокивисят там и выдает ошибку памяти.

java.lang.OutOfMemoryError: невозможно создать новый собственный поток в java.lang.Thread.start0 (собственный метод) в java.lang.Thread.start(Thread.java:714) в java.util.concurrent.ThreadPoolExecutor.addWorker (ThreadPoolExecutor.java:950) в java.util.concurrent.ThreadPoolExecutor.ensurePrestart (ThreadPoolExecutor.java:1587) в запланированном виде.delayedExecute (ScheduledThreadPoolExecutor.java:334) в java.util.concurrent.ScheduledThreadPoolExecutor.schedule (ScheduledThreadPoolExecutor.java:549) в java.util.concurrent.ScheduledThreadPoolExecutor.submit (ScheduledThreadPoolExecutor.java:648) в io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual (NewThreadWorker.jternal14) io.IoScheduler $ EventLoopWorker.) в io.reactivex.Flowable.subscribe (Flowable.java:13234) в io.reactivex.Flowable.subscribe (Flowable.java:13170) в io.reactivex.Flowable.subscribe (Flowable.java:13091)

Может ли кто-нибудь помочь мне в этом?

...