У меня есть кэш, который можно использовать только в одном потоке одновременно, и Cache::save
блокирует:
struct Cache { /* ... */ }
impl Cache { fn save(&mut self, data: Data) { /* ... */ } }
Я хочу использовать это для сохранения данных из futures::sync::mpsc::UnboundedReceiver<Data>
.Мне нужен какой-то Mutex
, но этот Mutex
должен хорошо играть с асинхронным.
Я нашел BiLock
, но его API странный.
Например:
use futures::{future::Future, stream::Stream, sync::BiLock};
use std::path::Path;
fn main() {
let cache = Cache::new(Path::new("/tmp/cache.txt"));
let (lock_cache, _) = BiLock::new(cache);
let (tx, rx) = futures::sync::mpsc::unbounded::<Data>();
let fut = rx.for_each(|data: Data| {
lock_cache
.lock()
.and_then(move |mut cache| cache.save(data))
});
}
struct Data;
struct Cache {
_marker: std::cell::UnsafeCell<()>,
}
impl Cache {
fn new(path: &Path) -> Cache {
unimplemented!();
}
fn save(&mut self, data: Data) -> Result<(), ()> {
unimplemented!();
}
}
Компиляция не удалась, поскольку ее можно использовать только один раз;BiLock::lock
имеет подпись:
fn lock(self) -> BiLockAcquire<T>
Я могу создать Stream
/ Future
на основе BiLock::poll_lock
, но он возвращает BiLockGuard
, который впоследствии невозможно использовать, потому что реальный кодследует использовать tokio_threadpool::blocking
API, потому что Cache::save
блокирует:
fn main() {
let cache = Cache::new(Path::new("/tmp/cache.txt"));
let (lock_cache, _) = BiLock::new(cache);
let (tx, rx) = futures::sync::mpsc::unbounded::<Data>();
let fut = rx.for_each(|data: Data| {
lock_cache.lock().and_then(move |mut cache| {
poll_fn(move || {
tokio_threadpool::blocking(|| cache.save(data).unwrap())
.map_err(|_| panic!("the threadpool shut down"))
})
})
});
}