Как отправить данные через поток фьючерсов, написав через черту io :: Write? - PullRequest
1 голос
/ 16 апреля 2019

У меня есть функция, которая принимает &mut io::Write, и я хотел бы использовать ее для отправки потокового ответа с веб-сервера actix без необходимости буферизации всего ответа.Функция «выталкивает» данные, и я не могу изменить функцию (вот и вся предпосылка этого вопроса), чтобы использовать асинхронные потоки или другие виды опроса.

В настоящее время я вынужден использовать &mut Vec (который реализует io::Write), чтобы буферизовать весь результат и затем отправить Vec в качестве тела ответа.Однако ответ может быть большим, поэтому я бы предпочел потоковую передачу без буферизации.

Есть ли какой-то адаптер, который бы реализовывал io::Write, с блокировкой записи по мере необходимости в ответ на противодавление и был бы совместимымс типами, которые actix-web может использовать для ответов (например, futures::Stream)?

fn generate(output: &mut io::Write) {
    // ...
}

fn request_handler() -> Result<HttpResponse> {
    thread::spawn(|| generate(/*???*/));
    Ok(HttpResponse::Ok().body(/*???*/))
}

std::sync::mpsc и futures::mpsc имеют либо оба конца асинхронно, либо оба конца блокируются, поэтому не очевидно, какиспользуйте их в качестве адаптера между синхронизацией и асинхронным окончанием.

Ответы [ 2 ]

2 голосов
/ 19 апреля 2019

Это возможно.Ключевым элементом является futures::sink::Wait:

Комбинатор мойки, который преобразует асинхронный приемник в блокирующий приемник .

Созданометод Sink::wait, эта функция превращает любой приемник в блокирующую версию.Это реализуется путем блокирования текущего потока, когда приемник в противном случае не может сделать прогресс.

Все, что нужно, - это обернуть этот тип в структуру, которая реализует io::Write:

use futures::{
    sink::{Sink, Wait},
    sync::mpsc,
}; // 0.1.26
use std::{io, thread};

fn generate(_output: &mut io::Write) {
    // ...
}

struct MyWrite<T>(Wait<mpsc::Sender<T>>);

impl<T> io::Write for MyWrite<T>
where
    T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
{
    fn write(&mut self, d: &[u8]) -> io::Result<usize> {
        let len = d.len();
        self.0
            .send(d.into())
            .map(|()| len)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0
            .flush()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }
}

fn foo() -> impl futures::Stream<Item = Vec<u8>, Error = ()> {
    let (tx, rx) = mpsc::channel(5);

    let mut w = MyWrite(tx.wait());

    thread::spawn(move || generate(&mut w));

    rx
}
0 голосов
/ 16 апреля 2019

Это невозможно.Actix-web управляет собственным буфером записи и сокетом.

...