Есть ли способ устранить эту утечку памяти с помощью моего кода CompletableFuture? - PullRequest
0 голосов
/ 21 июня 2020

У нас есть сервер, который выполняет обратное давление в http1.1 (да, он отключает сокеты, и клиенты фактически замирают в ожидании, если мы находимся под нагрузкой ... это довольно приятно). Мы делаем это и в http2 без того, чтобы спецификация http2 создавала противодавление в движке http2 (ie. Нам не нужно противодавление там, но мы следуем spe c, и он также отключает сокеты). Когда я говорю «выключить», я имею в виду, что он отменяет регистрацию, поэтому мы перестаем читать ni c, и наш буфер ni c заполняется, а затем клиенты ni c заполняются, пока он не зависнет.

В любом случае , мы столкнулись с интересной проблемой и пытаемся решить ее без утечки памяти. Наша последняя итерация была примерно такой:

public CompletableFuture<Void> runLoop(List<T> newData, Session session, Processor<T> processFunction) {
    
    //All the below futures must be chained with previous ones in case previous ones are not
    //done which will serialize it all to be in sequence
    CompletableFuture<Void> future = session.getProcessFuture();
    
    for(T data : newData) {
        //VERY IMPORTANT: Writing the code like this would slam through calling process N times
        //BUT it doesn't give the clients a chance to set a flag between packets
        //Mainly done for exceptions and streaming so you can log exc, set a boolean so you
        //don't get 100 exceptions while something is happening like socket disconnect
        //In these 2 lines of code, processCorrectly is CALLED N times RIGHT NOW
        //The code below this only calls them right now IF AND ONLY IF the client returns
        //a completed future each time!!!
        //CompletableFuture<Void> messageFuture = processFunction.process(data);
        //future = future.thenCompose( f -> messageFuture);
        
        future = future.thenCompose( voidd -> processFunction.process(data));
    }
    
    session.setProcessFuturee(future);
    
    return future;
}

По мере поступления данных L oop вызывается для N блоков данных. runL oop всегда вызывается последовательно, поэтому у нас нет условий гонки. Мы вызываем runL oop X раз. Проблема в том, что в текущем коде выше processFunction.process МОЖЕТ быть вызван в другом потоке (и на самом деле это довольно часто только для наших конечных точек потоковой передачи).

Мне интересно, как перерезать шнур, так сказать так что мы остановили цепь. комментирование session.setProcessFuture ДЕЙСТВИТЕЛЬНО останавливает цепочку, но проблема заключается в том, что входящие данные, поступающие на 2-е место, могут затем превзойти данные, которые поступили первыми (и имеют, но редко).

Тест, который я использую, если мы комментарий setProcessFuture колеблется между 500 МБ и 6 МБ памяти, по ссылке ниже. Если я не буду комментировать это, он медленно использует все 500 МБ.

https://github.com/deanhiller/webpieces/blob/master/core/core-util/src/test/java/org/webpieces/util/futures/TestLoopingChainMemory.java

Попытка избежать состояния гонки и проблем с памятью и также разрешить обратное давление, которое выполняется с использованием будущего, которое мы возвращаем из runL oop (). Если остается слишком много нерешенных, мы прекращаем подавать трафик c для запуска L oop.

FYI: thenCompose == scala FlatMap

EDIT: у меня была другая идея и поэтому я попробовал это, но это тоже не удалось. Размер списка обычно довольно мал в производстве, поэтому я не возражал против сложения фьючерсов в l oop, но попытался разрезать цепочку за пределами l oop, чтобы память не складывалась со временем и никогда не очищалась. ...

public CompletableFuture<Void> runLoop(List<T> newData, Session session, Processor<T> processFunction) {
    
    //All the below futures must be chained with previous ones in case previous ones are not
    //done which will serialize it all to be in sequence
    CompletableFuture<Void> future = session.getProcessFuture();
    
    CompletableFuture<Void> newFuture = new CompletableFuture<Void>(); 
            
    for(T data : newData) {
        //VERY IMPORTANT: Writing the code like this would slam through calling process N times
        //BUT it doesn't give the clients a chance to seet a flag between packets
        //Mainly done for exceptions and streaming so you can log exc, set a boolean so you
        //don't get 100 exceptions while something is happening like socket disconnect
        //In these 2 lines of code, processCorrectly is CALLED N times RIGHT NOW
        //The code below this only calls them right now IF AND ONLY IF the client returns
        //a completed future each time!!!
        
        //This seems to have memory issues as well....
        //CompletableFuture<Void> temp = processFunction.process(data);
        //future = future.thenCompose(f -> temp);
        
        future = future.thenCompose( voidd -> processFunction.process(data));
    }
    
    future.handle((voidd, t) -> {
        if(t != null) {
            newFuture.completeExceptionally(t);
            return 0;
        }
        
        newFuture.complete(null);
        return 0;
    });
    
    //comment this out and memory leak goes away of course.......
    session.setProcessFuturee(newFuture);
    
    return newFuture;
}

EDIT: хорошо, я обнаружил, что эта строка помогает, НО мне пришлось добавить код, чтобы ждать гораздо дольше очистки памяти. На очистку ушло «много времени», и объем памяти уменьшился до 13 МБ. Теперь мне интересно, почему очистка занимает так много времени ... возможно, объекты добрались до более старых поколений в модели g c

future = future.thenComposeAsync( voidd -> processFunction.process(data), executor );

ТОГДА, я понял, а если ждать по моему исходному коду . Здесь все стало странно. Он только вернулся к 196 МБ и остался там. Я не уверен, почему и что это за ссылка. Я действительно ничего не вижу в MAT (eclipse) или неправильно использую этот инструмент.

У меня есть утечка памяти? Я смущен этим последним результатом. он должен go до <20 МБ по крайней мере </strong>

РЕДАКТИРОВАТЬ (обращаясь к ответу Шадова): Я считаю, что возвращенное будущее не привязано к тому месту, откуда оно пришло. Это, так сказать, «хвост» цепи Я думаю . Однако все наоборот, будущее, которое его создало, привязано к этому, так что когда будущее, которое его создало, будет сделано, оно может решить это будущее. Итак, на мой взгляд, я каждый раз отправляю хвост списка в сеансе (не создавая цепочку, которая длиннее и длиннее). Настоящая проблема с фьючерсами - это нить, которая ссылается на нее, и была ли она решена. Это означает, что единственная оставшаяся ссылка - это анонимный Runnable, который разрешает это. Как только это произойдет, оно должно разрешиться.

CompletableFuture даже не здесь память (MAT eclipse) ..

введите описание изображения здесь

OMG, еще одно важное открытие. . Если я закомментирую эту строку

  • future.complete (null);

, тогда программа будет летать вечно, а объем памяти колеблется между 180 МБ и 6 МБ.

Ответы [ 2 ]

1 голос
/ 21 июня 2020

Не очень хорош в этом, но вы строите все большее и большее будущее и никогда не останавливаетесь - независимо от размера списка, он будет только потреблять все больше и больше памяти, никогда не освобождая ее.

Вы будете посмотрите, сделаете ли вы в своем тесте:

chain.runLoop(list, s, p);
if(queue.size() == 0) {
  System.out.println("queue empty");
  s.setProcessFuturee(CompletableFuture.completedFuture(null));
  rt.gc();
}

Так что, на мой взгляд, вам нужно использовать другой инструмент, CompletableFuture может быть недостаточно сильным для этого. Может быть, какая-нибудь полноценная реактивная библиотека, вроде rx java или Reaction?

0 голосов
/ 21 июня 2020

Хорошо, я сделал коммит git pu sh и git. Утечки памяти нет. Происходит то, что мы так сильно хлопаем по основному потоку, что все накапливается ........ когда дается время для решения, память возвращается к 6 МБ. Это занимает ОЧЕНЬ много времени, но журналы помогли увидеть, что это проще, И я, что наиболее важно, пропустил вызов runtime.g c () в критическом месте !!!!

soooo, с противодавлением на месте, у нас не должно быть проблем, поскольку это заставляет все фьючерсы разрешаться перед добавлением нагрузки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...