Поскольку StreamingBody
реализует Stream<Item = Vec<u8>, Error = Error>
, мы можем построить MCVE , который представляет это:
extern crate futures; // 0.1.25
use futures::{prelude::*, stream};
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
}
Затем мы можем каким-то образом получить «потоковое тело» и использовать Stream::for_each
для обработки каждого элемента в Stream
. Здесь мы просто вызываем write_all
с некоторым указанным расположением вывода:
use std::{fs::File, io::Write};
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}
Затем мы можем написать небольшую основную часть тестирования:
fn main() {
let mut file = Vec::new();
{
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
}
assert_eq!(file, b"0123456789ABCDEF");
}
Важные замечания о качестве этой наивной реализации:
Вызов write_all
может потенциально блокировать, что не следует делать в асинхронной программе. Было бы лучше передать эту блокирующую работу пулу потоков.
Использование Future::wait
заставляет поток блокироваться до тех пор, пока не будет завершено будущее, что отлично подходит для тестов, но может не подходить для вашего реального варианта использования.
Смотри также: