Самый быстрый способ отправки множества групп HTTP-запросов с использованием нового синтаксиса async / await и контроля количества рабочих - PullRequest
4 голосов
/ 19 октября 2019

В самых последних потоках, которые я прочитал, говорится, что асинхронный - это лучший способ выполнить множество операций ввода-вывода, таких как отправка HTTP-запросов и тому подобное. Я недавно пытался выбрать асинхронный режим, но не могу понять, как отправлять множество групп запросов параллельно, например:

let client = reqwest::Client::new();
let mut requests = 0;

let get = client.get("https://somesite.com").send().await?;
let response = get.text().await?;

if response.contains("some stuff") {
    let get = client.get("https://somesite.com/something").send().await?;
    let response = get.text().await?;

    if response.contains("some new stuff") {
        requests += 1;
        println!("Got response {}", requests)

Это делает то, что я хочу, но как я могу запустить его параллельно иконтролировать количество «рабочих потоков» или что-то эквивалентное пулу потоков в async?

Я понимаю, что это похоже на этот вопрос , но мой строго говорит о ночных Rustсинтаксис async / await и более конкретный вариант использования, где необходимо выполнить группы запросов / задач. Я также нахожу использование комбинаторов для таких ситуаций немного запутанным, надеясь, что более новый стиль поможет сделать его немного более читабельным.

1 Ответ

0 голосов
/ 24 октября 2019

Не уверен, что это самый быстрый способ, поскольку я просто экспериментирую сам, но вот мое решение:

let client = reqwest::Client::new();

let links = vec![ // A vec of strings representing links
    "example.net/a".to_owned(), 
    "example.net/b".to_owned(),
    "example.net/c".to_owned(),
    "example.net/d".to_owned(),
    ];

let ref_client = &client; // Need this to prevent client from being moved into the first map
futures::stream::iter(links)
    .map(async move |link: String| {
        let res = ref_client.get(&link).send().await;

        // res.map(|res| res.text().await.unwrap().to_vec())
        match res { // This is where I would usually use `map`, but not sure how to await for a future inside a result 
            Ok(res) => Ok(res.text().await.unwrap()),
            Err(err) => Err(err), 
        }
    })
    .buffer_unordered(10) // Number of connection at the same time
    .filter_map(|c| future::ready(c.ok())) // Throw errors out, do your own error handling here
    .filter_map(|item| {
        if item.contains("abc") {
            future::ready(Some(item))
        } else {
            future::ready(None)
        }
    })
    .map(async move |sec_link| {
        let res = ref_client.get(&sec_link).send().await;
        match res {
            Ok(res) => Ok(res.text().await.unwrap()),
            Err(err) => Err(err),
        }
    })
    .buffer_unordered(10) // Number of connections for the secondary requests (so max 20 connections concurrently)
    .filter_map(|c| future::ready(c.ok()))
    .for_each(|item| {
        println!("File received: {}", item);
        future::ready(())
    })
    .await;

Для этого требуется функция #![feature(async_closure)].

...