Как десериализовать сообщения в актерах Actix? - PullRequest
0 голосов
/ 13 октября 2018

Мое намерение - получать события через WebSockets и использовать их при замыканиях main.Это работает, когда сообщения являются чистым текстом (String), но идея десериализации этого текста в некоторые структуры.

В этом примере я добавил только Data, Error и Event, но в других случаях все может быть иначе, поэтому я использовал дженерики для этого, но я немного растерялся.Компилятор предложил несколько вещей, которые я пробовал, но я не знаю, как «принудительно» заставить сообщение быть преобразовано в определенный тип (Data в этом примере, но EventManager может использоваться в других частях, поэтому он должен быть общим).

Я прикрепил этот код, который пытается показать мою идею, хотя он не компилируется:

events.rs:

use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;

use serde::de;
use serde_json::from_str;

struct MyActor<T> {
    manager: EventManager<T>,
}

impl<T: 'static> Actor for MyActor<T> {
    type Context = Context<Self>;
}

impl<T: 'static> StreamHandler<Message, ProtocolError> for MyActor<T> {
    fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
        match msg {
            Message::Text(text) => {
                debug!("Received {}", text);

                for idx in 0..self.manager.events.len() {
                    let data =
                        from_str(&text).expect(&format!("Error when deserializing {:?}", text));
                    (self.manager.events[idx].handler)(data)
                }
            }
            _ => panic!(),
        }
    }
}

pub struct Event<T> {
    handler: Box<Fn(T) + 'static>,
}

pub struct EventManager<T> {
    events: Vec<Event<T>>,
}

impl<T: 'static> EventManager<T>
where
    T: serde::Deserialize<'static>,
{
    pub fn new() -> Self {
        Self { events: vec![] }
    }

    pub fn capture<F>(&mut self, function: F)
    where
        F: for<'h> Fn(T) + 'static,
    {
        let event = Event {
            handler: Box::new(function),
        };
        self.events.push(event);
    }

    pub fn run(self) {
        let runner = System::new("example");

        debug!("run");

        Arbiter::spawn(
            Client::new("example")
                .connect()
                .map(|(reader, _writer)| {
                    MyActor::create(|ctx| {
                        MyActor::add_stream(reader, ctx);
                        MyActor { manager: self }
                    });
                })
                .map_err(|err| {}),
        );

        runner.run();
    }
}

main.rs:

#[macro_use]
extern crate log;
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;

pub mod events;

use actix::*;
use serde::de;
use serde::de::{Deserialize, Deserializer};

use events::EventManager;

#[derive(Debug, Message, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Data {
    Error(Error),
    Event(Event),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Error {
    message: String,
    code: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
    name: String,
    content: String,
}

fn main() {
    env_logger::init();

    let mut client = EventManager::<Data>new();

    client.capture(|data| debug!("event: {:?}", data));
    client.run();
}

Весь код можно увидеть в https://github.com/foochi/how-deserialize-within-actix

1 Ответ

0 голосов
/ 16 октября 2018

Существуют некоторые исправления для его компиляции.

Хитрость для его компиляции заключается в использовании границ черт высшего ранга (HTRB) границ черт вместо объявления 'static продолжительности жизни.

Следуйте предложению компилятора и привяжите черту T: serde::Deserialize<'_>:

impl<T> StreamHandler<Message, ProtocolError> for MyActor<T>
where
    for<'de> T: serde::Deserialize<'de> + 'static,

Затем измените также черту Deserialize<'static>, связанную с имплией EventManager, с чертой HTRB, связанной сполучить его совместимым с требованием реализации StreamHandler:

impl<T: 'static> EventManager<T>
where
    for<'de> T: serde::Deserialize<'de>,

Наконец, если вы исправите строку, создайте клиент с правильным синтаксисом:

let mut client: EventManager<Data> = EventManager::new();

пример кода долженcompile.

Примечание. Для capture использование более высокой границы признака для объявления требования Fn является излишним, просто:

pub fn capture<F>(&mut self, function: F)
where
    F: Fn(T) + 'static,
...