Реестр Actix SyncArbiter - PullRequest
0 голосов
/ 07 апреля 2019

Я пытаюсь реализовать пул из 10 Redis соединений, используя SyncArbiter для использования различными актерами.Скажем, у нас есть актер по имени Боб, который должен использовать актера Redis для выполнения своей задачи.

Хотя это достижимо следующим образом:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: &str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", &msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}

Из приведенной выше реализации дескриптора вБоб, неясно, как Боб мог получить адрес актера Redis.Или отправьте любое сообщение любому Actor, работающему в SyncArbiter.

То же самое может быть достигнуто с использованием обычных Arbiter и Registry, но, насколько мне известно, Actix не позволяет использовать несколько одинаковых акторов (например, мы не можем запустить 10 актеров Redis).используя обычный Arbiter).

Чтобы формализовать мои вопросы:

  • Есть ли Registry для SyncArbiter актеров
  • Можно ли начать несколько одинаковыхтип актеров в обычном Arbiter
  • Есть ли лучший / более канонический способ реализации пула соединений

РЕДАКТИРОВАТЬ

Версии:

  • actix 0.7.9
  • actix_web 0.7.19
  • фьючерсы = "0.1.26"
  • ржавчина 1.33.0

1 Ответ

0 голосов
/ 10 апреля 2019

Я нашел ответ сам.

Из коробки нет способа извлечь Actor с SyncContext из реестра.

Учитывая мой пример выше. Чтобы актер Bob отправил актеру Redis сообщение любого типа, ему нужно знать адрес Redis актера. Bob может получить адрес Redis в явном виде - содержится в сообщении, отправленном ему или прочитанном из какого-то общего состояния.

Я хотел систему, аналогичную системе Эрланга, поэтому я решил не передавать адреса актеров через сообщения, так как она казалась слишком трудоемкой, подверженной ошибкам, и, на мой взгляд, она отрицает необходимость иметь модель параллелизма, основанную на акторе (поскольку ни один актер не может сообщение любого другого актера).

Поэтому я исследовал идею общего состояния и решил реализовать свой собственный SyncRegistry, который будет аналогом стандарта Actix Registry - который делает именно то, что я хочу, но не для актеров с SyncContext .

Вот наивное решение, которое я кодировал: https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

Со следующей настройкой:

fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

Актер Bob может получить адрес Redis следующим образом из любой точки программы:

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(&msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}
...