Я сейчас отправляю замыкания / функции между потоками.
Это прекрасно работает для функций syn c.
Я специально передаю pub type WSMethod<T> = Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>;
Пример отправляемая функция
pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
let deserialized = serde_json::from_value(req.data)?;
let league = db::update_league(&conn, deserialized)?;
let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
serde_json::to_string(&resp_msg).map_err(|e| e.into())
}
Однако теперь я бы хотел переключиться на отправку асинхронных c функций,
т.е.
pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
}
publish_competitions(ws_conns, &competitions_out).await;
let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
serde_json::to_string(&resp_msg).map_err(|e| e.into())
}
Это точно такая же сигнатура функции, это просто asyn c.
Когда я упаковываю функции, чтобы их можно было отправлять, я получаю эту ошибку
Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type
full:
288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
| ------------------------ the `Output` of this `async fn`'s found opaque type
|
= note: expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
found opaque type `impl core::future::future::Future`
= note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`
I попытался присоединить .await
к method(req, conn, ws_conns, user_ws_id).await
, сайту вызова переданного метода.
Это приводит к ошибкам компилятора здесь из-за того, что Future
не реализовано для Result
. Поэтому
Я меняю тип с: Box<dyn Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Result<String, BoxError> + Send + Sync>
-> Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Future<Output=Result<String, BoxError>>) + Send + Sync>
, он жалуется на размер фьючерса, поэтому я добавляю в коробку будущее, затем другую ошибку (см. Открепление), поэтому я прикрепляю ошибка.
В конечном итоге приводит к Box<dyn (Fn(WSReq, PgConn, &mut WSConnections<T>, Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync >>) + Send + Sync>
Ошибка теперь составляет
Box::new(upsert_competitions)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::pin::Pin`, found opaque type
expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
found opaque type `impl core::future::future::Future
Я не понимаю, как go отсюда. Я не думаю, что мне следует закреплять / упаковывать результаты функций, я хочу закреплять / упаковывать будущее, возвращаемое при вызове функции, но я не думаю, что смогу это сделать,
, как наверняка я хочу быть бокс / закрепление будущего после его создания, когда я называю забавой c, а не раньше.
Я также пробовал такие вещи, как
Box::new(Pin::new(Box::new(upsert_competitions))))
на основе вышеуказанной ошибки,
и это дает мне ожидаемый Fn<blah>
...., а не Pin<Box<....
Источник полного обновленного кода:
CLosure type-def
закрытие успешно передается как обычная функция
закрытие безуспешно передается как asyn c fun c
вызывается замыкание
Редактировать:
Последние обновления (прогресс ошибки)
pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
println!("{:?}", &deserialized);
let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
// assume anything upserted the user wants to subscribe to
if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
}
// TODO ideally would return response before awaiting publishing going out
publish_competitions(ws_conns, &competitions_out).await;
println!("{:?}", &competitions_out);
let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
out
}
Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 | Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by
`hmmm` is not `Sync`
Так что теперь просто нужно чтобы понять, как сделать этот будущий синтаксис c
note: future is not `Sync` as this value is used across an await
дает мне хорошую подсказку
299 | publish_competitions(ws_conns, &competitions_out).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await
происходит здесь, с conn
может быть использовано позже
Выяснилось, что я должен использовать conn
вне функции inner-asyn c, а не использовать через await.
После исправления переменных в await, я теперь получаю
error[E0621]: explicit lifetime required in the type of `ws_conns`
--> src/handlers.rs:305:5
|
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
| ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 | Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required
Пробовал делать & 'stati c ссылки, но в конце концов я попал в точку, где это не нормально.
Я также попытался использовать типы upsert_competitions<U: lock_api::RawMutex + 'static>
generi c вместо
однако получение черты lock_api::mutex::RawMutex
не реализовано для std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>
Мне нужно найти U, который реализует .lock (), но также является чертой, которую реализует Ar c.