Не удается освободить память Dynami c в задаче Asyn c Rust - PullRequest
1 голос
/ 10 января 2020

Оказалось, что в нашем приложении Rust произошла утечка памяти, и я разобрал проблему до приведенного ниже примера кода. Я до сих пор не вижу, в чем проблема.

Я ожидаю, что в сообщении (500 000 + 1) память приложения вернется к низким уровням. Вместо этого я наблюдаю следующее:

  • до отправки 500 000 сообщений использование памяти составляет 124 КБ
  • после отправки 500 000 сообщений, использование памяти увеличивается до 27 МБ
  • после отправки 500 000 + 1 сообщение использование памяти уменьшается до 15,5 МБ

После многих попыток я не могу найти, где скрывается 15,5 МБ. Единственный способ освободить память - убить приложение. Valgrind не обнаружил никаких утечек памяти. Обход, решение или указание в правильном направлении - все это очень важно.

Демонстрационный проект с кодом ниже можно найти здесь: https://github.com/loriopatrick/mem-help

Примечания

  • Если я удаляю self.items.push(data); использование памяти не увеличивается, поэтому я не думаю, что это проблема с Отправителем / Получателем
  • Упаковка items: Vec<String> в Arc<Mutex<..>> нет заметной разницы в памяти

Задача, в которой память должна управляться

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

Пример полного запуска

use std::time::Duration;

use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

pub fn main() {
    {
        let mut runtime: Runtime = tokio::runtime::Builder::new()
            .threaded_scheduler()
            .core_threads(1)
            .enable_all()
            .build()
            .expect("Failed to build runtime");

        let (mut sender, receiver) = channel(1024);
        let p = Processor::new();

        runtime.spawn(async move {
            println!("Before send, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;

            for i in 0..500000 {
                sender.send("Hello".to_string()).await;
            }

            println!("Sent 500,000 items, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
            sender.send("Hello".to_string()).await;

            println!("Send message to clear items");
            tokio::time::delay_for(Duration::from_secs(3)).await;

            println!("Closing sender in 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });

        runtime.block_on(async move {
            {
                p.task(receiver).await;
            }
            println!("Task is done, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });
    }

    println!("Runtime closed, waiting 5 seconds");
    std::thread::sleep(Duration::from_secs(5));
}

Автомобиль go .toml

[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }
...