Какой тип мьютекса следует использовать для кэширования данных асинхронного потока? - PullRequest
0 голосов
/ 20 апреля 2019

У меня есть кэш, который можно использовать только в одном потоке одновременно, и Cache::save блокирует:

struct Cache { /* ... */ } 
impl Cache { fn save(&mut self, data: Data) { /* ... */ }  }

Я хочу использовать это для сохранения данных из futures::sync::mpsc::UnboundedReceiver<Data>.Мне нужен какой-то Mutex, но этот Mutex должен хорошо играть с асинхронным.

Я нашел BiLock, но его API странный.

Например:

use futures::{future::Future, stream::Stream, sync::BiLock};
use std::path::Path;

fn main() {
    let cache = Cache::new(Path::new("/tmp/cache.txt"));
    let (lock_cache, _) = BiLock::new(cache);

    let (tx, rx) = futures::sync::mpsc::unbounded::<Data>();
    let fut = rx.for_each(|data: Data| {
        lock_cache
            .lock()
            .and_then(move |mut cache| cache.save(data))
    });
}

struct Data;

struct Cache {
    _marker: std::cell::UnsafeCell<()>,
}

impl Cache {
    fn new(path: &Path) -> Cache {
        unimplemented!();
    }

    fn save(&mut self, data: Data) -> Result<(), ()> {
        unimplemented!();
    }
}

Компиляция не удалась, поскольку ее можно использовать только один раз;BiLock::lock имеет подпись:

fn lock(self) -> BiLockAcquire<T>

Я могу создать Stream / Future на основе BiLock::poll_lock, но он возвращает BiLockGuard, который впоследствии невозможно использовать, потому что реальный кодследует использовать tokio_threadpool::blocking API, потому что Cache::save блокирует:

fn main() {
    let cache = Cache::new(Path::new("/tmp/cache.txt"));
    let (lock_cache, _) = BiLock::new(cache);

    let (tx, rx) = futures::sync::mpsc::unbounded::<Data>();
    let fut = rx.for_each(|data: Data| {
        lock_cache.lock().and_then(move |mut cache| {
            poll_fn(move || {
                tokio_threadpool::blocking(|| cache.save(data).unwrap())
                    .map_err(|_| panic!("the threadpool shut down"))
            })
        })
    });
}
...