Оказалось, что в нашем приложении Rust произошла утечка памяти, и я разобрал проблему до приведенного ниже примера кода. Я до сих пор не вижу, в чем проблема.
Я ожидаю, что в сообщении (500 000 + 1) память приложения вернется к низким уровням. Вместо этого я наблюдаю следующее:
- до отправки 500 000 сообщений использование памяти составляет 124 КБ
- после отправки 500 000 сообщений, использование памяти увеличивается до 27 МБ
- после отправки 500 000 + 1 сообщение использование памяти уменьшается до 15,5 МБ
После многих попыток я не могу найти, где скрывается 15,5 МБ. Единственный способ освободить память - убить приложение. Valgrind не обнаружил никаких утечек памяти. Обход, решение или указание в правильном направлении - все это очень важно.
Демонстрационный проект с кодом ниже можно найти здесь: https://github.com/loriopatrick/mem-help
Примечания
- Если я удаляю
self.items.push(data);
использование памяти не увеличивается, поэтому я не думаю, что это проблема с Отправителем / Получателем - Упаковка
items: Vec<String>
в Arc<Mutex<..>>
нет заметной разницы в памяти
Задача, в которой память должна управляться
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
Пример полного запуска
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
pub fn main() {
{
let mut runtime: Runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.expect("Failed to build runtime");
let (mut sender, receiver) = channel(1024);
let p = Processor::new();
runtime.spawn(async move {
println!("Before send, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
for i in 0..500000 {
sender.send("Hello".to_string()).await;
}
println!("Sent 500,000 items, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
sender.send("Hello".to_string()).await;
println!("Send message to clear items");
tokio::time::delay_for(Duration::from_secs(3)).await;
println!("Closing sender in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
runtime.block_on(async move {
{
p.task(receiver).await;
}
println!("Task is done, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
}
println!("Runtime closed, waiting 5 seconds");
std::thread::sleep(Duration::from_secs(5));
}
Автомобиль go .toml
[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }