Передача функции asyn c другому фрагменту кода (не может удовлетворить компилятор) - PullRequest
1 голос
/ 03 мая 2020

Я сейчас отправляю замыкания / функции между потоками.

Это прекрасно работает для функций syn c.

Я специально передаю pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>;

Пример отправляемая функция

pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
    let deserialized = serde_json::from_value(req.data)?;
    let league = db::update_league(&conn, deserialized)?;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

Однако теперь я бы хотел переключиться на отправку асинхронных c функций,

т.е.

pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
    let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
    if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
        sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
    }
    publish_competitions(ws_conns, &competitions_out).await;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

Это точно такая же сигнатура функции, это просто asyn c.

Когда я упаковываю функции, чтобы их можно было отправлять, я получаю эту ошибку

Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type

full:

288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    |                                                                                                                ------------------------ the `Output` of this `async fn`'s found opaque type
    |
    = note:     expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future`
    = note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`

I попытался присоединить .await к method(req, conn, ws_conns, user_ws_id).await, сайту вызова переданного метода.

Это приводит к ошибкам компилятора здесь из-за того, что Future не реализовано для Result. Поэтому

Я меняю тип с: Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync> -> Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Future<Output=Result<String, BoxError>>) + Send + Sync>

, он жалуется на размер фьючерса, поэтому я добавляю в коробку будущее, затем другую ошибку (см. Открепление), поэтому я прикрепляю ошибка.

В конечном итоге приводит к Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>

Ошибка теперь составляет

Box::new(upsert_competitions) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::pin::Pin`, found opaque type

expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future

Я не понимаю, как go отсюда. Я не думаю, что мне следует закреплять / упаковывать результаты функций, я хочу закреплять / упаковывать будущее, возвращаемое при вызове функции, но я не думаю, что смогу это сделать,

, как наверняка я хочу быть бокс / закрепление будущего после его создания, когда я называю забавой c, а не раньше.

Я также пробовал такие вещи, как

Box::new(Pin::new(Box::new(upsert_competitions)))) на основе вышеуказанной ошибки,

и это дает мне ожидаемый Fn<blah> ...., а не Pin<Box<....

Источник полного обновленного кода:

CLosure type-def

закрытие успешно передается как обычная функция

закрытие безуспешно передается как asyn c fun c

вызывается замыкание

Редактировать:

Последние обновления (прогресс ошибки)

pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
        let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
        println!("{:?}", &deserialized);
        let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
        // assume anything upserted the user wants to subscribe to
        if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
            sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
        }
        // TODO ideally would return response before awaiting publishing going out
        publish_competitions(ws_conns, &competitions_out).await;
        println!("{:?}", &competitions_out);
        let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
        let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
        out
    }
    Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 |     Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by 
`hmmm` is not `Sync`

Так что теперь просто нужно чтобы понять, как сделать этот будущий синтаксис c

note: future is not `Sync` as this value is used across an await

дает мне хорошую подсказку

299 |         publish_competitions(ws_conns, &competitions_out).await;
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await

происходит здесь, с conn может быть использовано позже

Выяснилось, что я должен использовать conn вне функции inner-asyn c, а не использовать через await.

После исправления переменных в await, я теперь получаю

error[E0621]: explicit lifetime required in the type of `ws_conns`
   --> src/handlers.rs:305:5
    |
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    |                                                                ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 |     Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required

Пробовал делать & 'stati c ссылки, но в конце концов я попал в точку, где это не нормально.

Я также попытался использовать типы upsert_competitions<U: lock_api::RawMutex + 'static> generi c вместо

однако получение черты lock_api::mutex::RawMutex не реализовано для std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>

Мне нужно найти U, который реализует .lock (), но также является чертой, которую реализует Ar c.

Ответы [ 2 ]

2 голосов
/ 04 мая 2020

Возвращаемый тип асинхронной функции c при преобразовании в Fn переносится в Future, а не в закрепленное будущее, поскольку вам нужно только закрепить его, чтобы начать опрос. Создание будущего, закрепленного с самого начала, сделает процесс создания составных фьючерсов из нескольких асинхронных c функций менее эффективным и более сложным. Таким образом, правильный тип - pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> [[UNNAMED TYPE implementing Future]]<Result<String, BoxError> + Send + Sync>>; Но вы не можете назвать этот тип [[UNNAMED TYPE внедряет Future]], поэтому вам нужно пометить его вручную. Самый простой способ сделать это с помощью метода в штучной упаковке из FutureExt в будущем.

Так что вам нужно объединить изменение типа на Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync> с заменой получения ссылки на метод с Box::new(|req, conn, connections, uuid| upsert_competitions(req, conn, connections, uuid).boxed())

0 голосов
/ 04 мая 2020

user1937 простой ответ, вероятно, работает (протестирую позже),

однако в одночасье я понял, что подход к размещению функций в хэш-карте и перемещению ссылок на функцию ..... был немного излишним.

Это использование признаков (в одном месте я не знаю реализацию, но я могу определить интерфейс, а в другом месте подразумевать этот интерфейс)

Вместо этого я определил асин c - trait (в настоящее время требуется asyn c -trait crate) в моей lib

pub trait WSHandler<T: Subscriptions>{
    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
    ) -> Result<String, BoxError>;
}

и сказал, что это funcs, чтобы ожидать обобщенный c WsHandler

async fn handle_ws_msg<T: Subscriptions, U: WSHandler<T>>(
    msg: ws::Message, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
) -> ws::Message{
    match msg.to_str(){
        // Can't get await inside `and_then`/`map` function chains to work properly
        Ok(msg_str) => match U::ws_req_resp(msg_str.to_string(), conn, ws_conns, user_ws_id).await{
            Ok(text) => ws::Message::text(text),
            Err(e) => ws_error_resp(e.to_string())
        },
        Err(_) => ws_error_resp(String::from("wtf. How does msg.to_str fail?"))
    }
}

тогда в моей основной программе Я смог указать черту

struct A{
}

#[async_trait]
impl WSHandler<subscriptions::Subscriptions> for A{

    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<subscriptions::Subscriptions>, user_ws_id: Uuid
    ) -> Result<String, BoxError>{
        let req: WSReq = serde_json::from_str(&msg)?;
        println!("{}", &req.data);
        let stringybob = String::from("upsert_competitions");
        match req.method.clone(){
            a if a == stringybob => upsert_competitions2(req, conn, ws_conns, user_ws_id).await,
            // imagine the other methods here
            uwotm8 => Err(Box::new(InvalidRequestError{description: uwotm8.to_string()}))
        }
    }
}
ws.on_upgrade(move |socket| warp_ws_server::handle_ws_conn::<subscriptions::Subscriptions, A>(socket, pool, ws_conns))

, и через 14 часов она наконец-то заработала. ура: D

...