Есть ли способ вызвать asyn c интерфейс ржавчины из python? - PullRequest
0 голосов
/ 20 июня 2020

Я оборачиваю некоторые функции reqwest ржавчины в файл req.lib и успешно вызываю его из python, используя cffi. Однако reqwest::blocking::Client заставляет меня использовать многопоточность в python. Я обнаружил, что reqwest можно вызвать в режиме asyn c в ржавчине. Интересно, есть ли способ сделать req.lib asyn c? даже полусинхронный c мне подходит.

Например, в настоящее время подпись заглушки:

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> *mut c_char

Могу я написать что-то вроде:

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> u64  // return request unique id

#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool  // whether given request is done

#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *mut c_char  // fetch response

Следовательно, вызовы cffi больше не блокируют основной поток. Я могу использовать один поток для вызова нескольких запросов. Любой совет или передовой опыт приветствуются.

1 Ответ

1 голос
/ 20 июня 2020

Асинхронный код выполняется через специальную среду выполнения, для python и ржавчины это разные и несовместимые библиотеки. Там вы не можете просто разделить будущее между языками, оно должно быть запущено на том же языке, на котором оно было создано.

Что касается вашего примера, это означает, что вам нужно запустить Client в исполнителе ржавчины (например, в tokio), а затем получить от него обратную связь. В качестве простейшего способа вы можете просто создать глобальную:

use lazy_static::lazy_static;
use tokio::runtime::Runtime;

lazy_static! {
    static ref RUNTIME: Runtime = Runtime::new().unwrap();
}

Затем после появления вам необходимо получить обратную связь, поэтому вы можете использовать пару карт со статусами и результатами:

use std::collections::HashMap;
use std::sync::RwLock;

use futures::prelude::*;
use tokio::sync::oneshot;

type FutureId = u64;
type UrlResult = reqwest::Result<String>;

type SyncMap<K, V> = RwLock<HashMap<K, V>>;

lazy_static! {
    // Map for feedback channels. Once result is computed, it is stored at `RESULTS`
    static ref STATUSES: SyncMap<FutureId, oneshot::Receiver<UrlResult>> = SyncMap::default();
    // Cache storage for results
    static ref RESULTS: SyncMap<FutureId, UrlResult> = SyncMap::default();
}

fn gen_unique_id() -> u64 { .. }

#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> FutureId {
    let url: &str = /* convert url */;

    let (tx, rx) = oneshot::channel();

    RUNTIME.spawn(async move {
        let body = reqwest::get(url).and_then(|b| b.text()).await;
        tx.send(body).unwrap(); // <- this one should be handled somehow
    });

    let id = gen_unique_id();

    STATUSES.write().unwrap().insert(id, rx);

    id
}

Здесь для каждого urlopen создается запрос oneshot::channel, что задерживает результат выполнения. Таким образом, можно проверить, завершен он или нет:

#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool {
    // first check in cache
    if RESULTS.read().unwrap().contains_key(&req_id) {
        true
    } else {
        let mut res = RESULTS.write().unwrap();
        let mut statuses = STATUSES.write().unwrap();

        // if nothing in cache, check the feedback channel
        if let Some(rx) = statuses.get_mut(&req_id) {
            let val = match rx.try_recv() {
                Ok(val) => val,
                Err(_) => {
                    // handle error somehow here
                    return true;
                }
            };

            // and cache the result, if available
            res.insert(req_id, val);
            true
        } else {
            // Unknown request id
            true
        }
    }
}

Тогда результат выборки будет довольно тривиальным:

#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *const c_char {
    let res = RESULTS.read().unwrap();

    res.get(&req_id)
        // there `ok()` should probably be handled in some better way
        .and_then(|val| val.as_ref().ok())
        .map(|val| val.as_ptr())
        .unwrap_or(std::ptr::null()) as *const _
}

Площадка ссылка.

Имейте в виду, что приведенное выше решение имеет свои преимущества:

  • результат кэшируется и может быть извлечен несколько раз;
  • API (надеюсь) поточно-ориентированный;
  • блокировки чтения и записи разделены, что может быть более быстрым решением, чем мьютекс;

, а также значительные недостатки:

  • RESULTS неограниченно растет и никогда не очищается;
  • потокобезопасность делает вещь немного сложной, поэтому может быть ненужным и thread_local! может использоваться для глобальных переменных вместо блокировок;
  • отсутствует правильной обработки ошибок;
  • RwLock используется, что иногда может вести себя хуже, чем некоторые другие примитивы;
  • STATUSES at is_finished получить доступ на запись, хотя может быть лучше иметь чтение сначала доступ;
...