Проблемы с производительностью и памятью в программе Async Rust - PullRequest
2 голосов
/ 11 июля 2019

Я хотел сравнить запросы от ржавчины к конкретному сервису, используя асинхронный клиент, и создал для этого асинхронный эталонный тестер.

Эта функция должна запускать указанное количество параллельных потоков (фактически, параллельных цепочек фьючерсов) в течение указанной продолжительностии сообщите количество выполненных итераций.

use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;

struct Config {
    workers: u32,
    duration: Duration,
}

/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
    f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
    Box::new(f().and_then(move |_| cycle(f)))
}

fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
    config: Config,
    f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
    let counter = rc::Rc::new(cell::Cell::new(0u32));
    let f = rc::Rc::new(f);
    future::select_all((0..config.workers).map({
        let counter = rc::Rc::clone(&counter);
        move |_| {
            let counter = rc::Rc::clone(&counter);
            let f = rc::Rc::clone(&f);
            cycle(move || {
                let counter = rc::Rc::clone(&counter);
                f().map(move |_| {
                    counter.set(counter.get() + 1);
                })
            })
        }
    }))
    .map(|((), _, _)| ())
    .map_err(|(err, _, _)| err)
    .select(
        timer::Delay::new(Instant::now() + config.duration)
            .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
    )
    .map(move |((), _)| counter.get())
    .map_err(|(err, _)| err)
}

fn main() {
    let duration = std::env::args()
        .skip(1)
        .next()
        .expect("Please provide duration in seconds")
        .parse()
        .expect("Duration must be integer number");

    let ms = Duration::from_millis(1);

    let mut rt = Runtime::new().expect("Could not create runtime");

    loop {
        let iters = rt
            .block_on(
                benchmark(
                    Config {
                        workers: 65536,
                        duration: Duration::from_secs(duration),
                    },
                    || {
                        /// Substitute actual benchmarked call
                        timer::Delay::new(Instant::now() + ms)
                            .map_err(|err| panic!("Failed to set delay: {:?}", err))
                    },
                )
                .map_err(|err| panic!("Benchamrking error: {:?}", err)),
            )
            .expect("Runtime error");
        println!("{} iters/sec", iters as u64 / duration);
    }
}

Тем не менее, результаты этого теста производительности и потребление памяти ухудшаются с увеличением продолжительности теста, например, на моем компьютере:

cargo run --release 1 ~ 900k итераций/ сек
cargo run --release 2 ~ 700 тыс. итераций / сек
cargo run --release 10 ~ 330 тыс. итераций / сек

Кроме того, использование памяти быстро растет по мере выполнения функции бенчмарка.Я попытался использовать valgrind, чтобы найти утечку памяти, но он только сообщает, что все выделенная память все еще может быть достигнута.

Как я могу исправить проблему?

Ответы [ 2 ]

2 голосов
/ 11 июля 2019

Похоже, что Box, возвращаемое cycle, не освобождается до конца benchmark, а выделение / удаление памяти занимает все больше и больше времени.

Я переписал вашу программу с async_await, без Box, и результаты теперь состоят из:

#![feature(async_await)]

use futures::{compat::Future01CompatExt, future, prelude::*, select};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::timer;

struct Config {
    workers: u32,
    duration: Duration,
}

// Build infinitely repeating future
async fn cycle<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(f: F) {
    loop {
        f().await;
    }
}

async fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(
    config: Config,
    f: F,
) -> usize {
    let counter = AtomicUsize::new(0);

    let infinite_counter = future::select_all((0..config.workers).map(|_| {
        cycle(|| {
            f().map(|_| {
                counter.fetch_add(1, Ordering::SeqCst);
            })
        })
        .boxed_local()
    }));

    let timer = timer::Delay::new(Instant::now() + config.duration)
        .compat()
        .unwrap_or_else(|_| panic!("Boom !"));

    select! {
        a = infinite_counter.fuse() => (),
        b = timer.fuse() => (),
    };

    counter.load(Ordering::SeqCst)
}

fn main() {
    let duration = std::env::args()
        .skip(1)
        .next()
        .expect("Please provide duration in seconds")
        .parse()
        .expect("Duration must be integer number");

    let ms = Duration::from_millis(1);

    // Use actix_rt runtime instead of vanilla tokio because I want
    // to restrict to one OS thread and avoid needing async primitives
    let mut rt = actix_rt::Runtime::new().expect("Could not create runtime");;

    loop {
        let iters = rt
            .block_on(
                benchmark(
                    Config {
                        workers: 65536,
                        duration: Duration::from_secs(duration),
                    },
                    || {
                        // Substitute actual benchmarked call
                        timer::Delay::new(Instant::now() + ms)
                            .compat()
                            .unwrap_or_else(|_| panic!("Boom !"))
                    },
                )
                .boxed_local()
                .unit_error()
                .compat(),
            )
            .expect("Runtime error");
        println!("{} iters/sec", iters as u64 / duration);
    }
}

Это мой первый раз с фьючерсами 0.3, так что у меня нет некоторых частей, таких как синтаксис select! или boxed_local, но это работает!


РЕДАКТИРОВАТЬ: Здесь блок зависимостей от Cargo.toml

[dependencies]
futures-preview = { version = "0.3.0-alpha", features = ["nightly", "compat", "async-await"] }
tokio = "0.1.22"
actix-rt = "0.2.3"
0 голосов
/ 11 июля 2019

Получается, что cycle действительно был виновником , как подозревал Грегори .Я нашел эту полезную функцию в футляре для фьючерсов: loop_fn и переписал cycle, используя ее:

/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
    f: F,
) -> impl Future<Item = (), Error = P::Error> + 'a {
    future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
}

Остальная часть кода осталась прежней.Теперь он компилируется со стабильной ржавчиной и даже сообщает почти вдвое больше итераций в секунду, чем предлагаемое решение для ночных фьючерсов (чего стоит этот синтетический тест).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...