Добавить новые задачи в цикл событий Tokio и повторить задачи при сбое - PullRequest
0 голосов
/ 16 ноября 2018

Я пытаюсь написать цикл событий Tokio, который может выполнять запрос на получение с того же сервера, со следующими характеристиками:

  • Следует использовать пул соединений
  • Как правило, запросы get выполняются медленно (> 1 с), поэтому они должны выполняться параллельно
  • Сервер может не отвечать, поэтому мне нужно время ожидания. Если запрос не был получен, отправьте его снова
  • Опрос получателя для новых URL, которые должны быть загружены. Они должны быть добавлены в цикл событий

До сих пор в моих попытках мне удавалось заставить работать разные комбинации из 4 предметов, но никогда не все вместе. Моя главная проблема в том, что я не совсем понимаю, как я могу добавить новые фьючерсы в цикл событий Tokio.

Я полагаю, мне нужно использовать loop_fn для основного цикла, который опрашивает получателя, и handle.spawn для создания новых задач? handle.spawn допускает только фьючерсы Result<(),()>, поэтому я не могу использовать его вывод для повторного вызова задания при неудаче, поэтому мне нужно перенести проверку повторов в это будущее?

Ниже приведена попытка, которая принимает и обрабатывает URL-адреса в пакетном режиме (поэтому нет непрерывного опроса) и имеет время ожидания (но без повторных попыток):

fn place_dls(&mut self, reqs: Vec<String>) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();

    let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
        // send with request through an async reqwest client in self
    }));

    let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
        n.into_body().concat2().and_then(|full_body| {
            debug!("Received: {:#?}", full_body);

            // TODO: how to put the download back in the queue if failure code is received?
        })
    });

    let work = rec_dls.select2(timeout).then(|res| match res {
        Ok(Either::A((got, _timeout))) => {
            Ok(got)
        },
        Ok(Either::B((_timeout_error, _get))) => {
            // TODO: put back in queue
            Err(io::Error::new(
                io::ErrorKind::TimedOut,
                "Client timed out while connecting",
            ).into())
        }
        Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
        Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
    });

    core.run(work);
}

Моя попытка с loop_fn была печально неудачной.

1 Ответ

0 голосов
/ 25 ноября 2018

Я предполагаю, что мне нужно использовать loop_fn для основного цикла

Я бы предложил немного другой подход: реализовать futures::sync::mpsc::Receiver потокового потребителя вместо цикла.

Это можно рассматривать как своего рода главный процесс: после получения URL-адреса через Receiver может быть запущена задача Tokio для загрузки содержимого. Тогда не будет проблем с повторной попыткой: просто отправьте сбойный или тайм-аут URL снова на главный канал через конечную точку Sender.

Вот эскиз рабочего кода:

extern crate futures;
extern crate tokio;

use std::{io, time::{Duration, Instant}};
use futures::{
    Sink,
    Stream,
    stream,
    sync::mpsc,
    future::Future,
};
use tokio::{
    runtime::Runtime,
    timer::{Delay, Timeout},
};

fn main() -> Result<(), io::Error> {
    let mut rt = Runtime::new()?;
    let executor = rt.executor();

    let (tx, rx) = mpsc::channel(0);
    let master_tx = tx.clone();
    let master = rx.for_each(move |url: String| {
        let download_future = download(&url)
            .map(|_download_contents| {
                // TODO: actually do smth with contents
                ()
            });
        let timeout_future =
            Timeout::new(download_future, Duration::from_millis(2000));
        let job_tx = master_tx.clone();
        let task = timeout_future
            .or_else(move |error| {
                // actually download error or timeout, retry
                println!("retrying {} because of {:?}", url, error);
                job_tx.send(url).map(|_| ()).map_err(|_| ())
            });
        executor.spawn(task);
        Ok(())
    });

    rt.spawn(master);

    let urls = vec![
        "http://url1".to_string(),
        "http://url2".to_string(),
        "http://url3".to_string(),
    ];
    rt.executor()
        .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

    rt.shutdown_on_idle().wait()
        .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
}

#[derive(Debug)]
struct DownloadContents;
#[derive(Debug)]
struct DownloadError;

fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
    // TODO: actually download here

    match url {
        // url2 always fails
        "http://url2" => {
            println!("FAILED downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(1000))
                .map_err(|_| DownloadError)
                .and_then(|()| Err(DownloadError));
            Box::new(future)
        },
        // url3 always timeouts
        "http://url3" => {
            println!("TIMEOUT downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(5000))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
            Box::new(future)
        },
        // everything else succeeds
        _ => {
            println!("SUCCESS downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(50))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
            Box::new(future)
        },
    }
}
...