Я пытаюсь написать цикл событий Tokio, который может выполнять запрос на получение с того же сервера, со следующими характеристиками:
- Следует использовать пул соединений
- Как правило, запросы get выполняются медленно (> 1 с), поэтому они должны выполняться параллельно
- Сервер может не отвечать, поэтому мне нужно время ожидания. Если запрос не был получен, отправьте его снова
- Опрос получателя для новых URL, которые должны быть загружены. Они должны быть добавлены в цикл событий
До сих пор в моих попытках мне удавалось заставить работать разные комбинации из 4 предметов, но никогда не все вместе. Моя главная проблема в том, что я не совсем понимаю, как я могу добавить новые фьючерсы в цикл событий Tokio.
Я полагаю, мне нужно использовать loop_fn
для основного цикла, который опрашивает получателя, и handle.spawn
для создания новых задач? handle.spawn
допускает только фьючерсы Result<(),()>
, поэтому я не могу использовать его вывод для повторного вызова задания при неудаче, поэтому мне нужно перенести проверку повторов в это будущее?
Ниже приведена попытка, которая принимает и обрабатывает URL-адреса в пакетном режиме (поэтому нет непрерывного опроса) и имеет время ожидания (но без повторных попыток):
fn place_dls(&mut self, reqs: Vec<String>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();
let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
// send with request through an async reqwest client in self
}));
let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
n.into_body().concat2().and_then(|full_body| {
debug!("Received: {:#?}", full_body);
// TODO: how to put the download back in the queue if failure code is received?
})
});
let work = rec_dls.select2(timeout).then(|res| match res {
Ok(Either::A((got, _timeout))) => {
Ok(got)
},
Ok(Either::B((_timeout_error, _get))) => {
// TODO: put back in queue
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
).into())
}
Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
});
core.run(work);
}
Моя попытка с loop_fn
была печально неудачной.