Распределение памяти в Rust при использовании каналов - PullRequest
0 голосов
/ 13 февраля 2020

Недавно я начал играть с Rust. Пытаюсь понять как это работает. После Kotlin, Typescript и Go это стало для меня головной болью) Я написал небольшое приложение, которое читает сообщения из канала и записывает их в файл. Я получаю неожиданное для меня использование памяти. Код ниже. Если кто-то может объяснить мне, что я делаю неправильно, я буду очень признателен.

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::io::{Write};
use std::fs::File;
use std::fs::OpenOptions;
use jemalloc_ctl::{stats, epoch};

const MSG_NUM: usize = 10000000;
const BUFFER_SIZE: usize = 1000;

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn main() {

    let e = epoch::mib().unwrap();
    let allocated = stats::allocated::mib().unwrap();
    let resident = stats::resident::mib().unwrap();

    let (sender, receiver): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = mpsc::channel();
    let (buffered_sender, buffered_receiver): (Sender<Vec<Message>>, Receiver<Vec<Message>>) = mpsc::channel();

    {
        for _ in 0..MSG_NUM {
            match sender.send(String::from("Hello World!").into_bytes()) {
                Ok(_) => continue,
                Err(err) => {
                    println!("Error {}", err);
                    continue
                },
            }
        }
        drop(sender)
    }

    e.advance().unwrap();
    println!("Step 1. {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    {
        let mut buffer: Vec<Message> = Vec::new();

        loop {

            let next_msg = match receiver.recv() {
                Ok(msg) => msg,
                Err(_) => {
                    println!("Channel closed for \"receiver\".");
                    break;
                }
            };

            buffer.push(Message {bytes: next_msg});

            if buffer.len() == BUFFER_SIZE {
                match buffered_sender.send(buffer.clone()) {
                    Ok(_) => {},
                    Err(err) => {
                        println!("Error: {}", err);
                        continue;
                    }
                }
                buffer.clear()
            }
        }

        drop(buffered_sender);
    };

    e.advance().unwrap();
    println!("Step 2. Excpected to see same amount of memory like in Step 1, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    thread::spawn(move || {
        let mut file = OpenOptions::new().create(true).append(true).open("foo.txt").unwrap();

        loop {
            match buffered_receiver.recv() {
                Ok(messages) => {
                    on_msg(messages, &mut file);
                },
                Err(_) => {
                    println!("Channel closed for \"buffered_receiver\".");
                    break;
                }
            };
        }

        e.advance().unwrap();
        println!("Step 3. Excpected to see around 0 MB allocated, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
    });

    loop {

    }
}

fn on_msg(buffer: Vec<Message>, file: &mut File) {
    let mut bytes: Vec<u8> = Vec::new();
    for msg in buffer.iter() {
        bytes.extend(msg.bytes.iter());
    }
    let _ = file.write(&*bytes); 
}

#[derive(Clone)]
struct Message {
    bytes: Vec<u8>
}

Результат выполнения:

Step 1. 640 MB allocated. 653 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 942 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 880 MB resident

1 Ответ

1 голос
/ 13 февраля 2020

Вы отбрасываете только "отправляющие" стороны ваших каналов, и я ожидаю, что большая часть буферизации будет происходить на стороне получателя, поскольку отправка никогда не завершается, и вы можете вызвать recv () после закрытия канала.

Запустив исходный сценарий, я получаю следующее:

Step 1. 640 MB allocated. 652 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 943 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 943 MB resident

, изменяющий сценарий для отбрасывания получателей (receiver одновременно с buffered_sender и buffered_receiver в конце читай ветку, но перед наступлением эпохи) я получаю:

Step 1. 640 MB allocated. 652 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 406 MB allocated. 943 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 0 MB allocated. 943 MB resident

Между прочим перекрестные каналы (которые обычно считаются превосходящими stdlib в каждой точке), похоже, ведут себя так, как ты ожидал, преобразовав скрипт в них (что также позволяет упростить его, так как каналы перекладин могут быть повторены), я получаю:

Step 1. 490 MB allocated. 508 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 406 MB allocated. 790 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 0 MB allocated. 790 MB resident
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...