Rust Multithread Asynchronous Websocket Server - PullRequest
0 голосов
/ 23 апреля 2020

Я хотел изучить Rust и поэтому решил использовать его для реального проекта.

Идея состоит в том, чтобы иметь сервер, который

  1. от основной поток A порождает новый поток B, который выполняет некоторую асинхронную задачу c, которая генерирует поток значений за время

  2. получает клиентские соединения websocket [c, d, e, ..] асинхронно и обрабатывает они одновременно порождают новые потоки [C, D, E, ...]

  3. отправляет значения, полученные в потоке B, в потоки [C, D, E, ...]

  4. каждый поток в [C, D, E, ...] публикует значение для соответствующего клиента в [c, d, e, ..]

Я использую

  • tokio для создания новых тем и tokio::sync::mpsc::unbounded_channel для отправить значения, вычисленные в B, в другие потоки

  • tokio_tungstenite для управления подключениями через веб-сокет и отправки значений клиентам

I удалось получить рабочий пример, где поток B производит целые числа и фиксированные временные интервалы. Когда сервер запускается, B начинает генерировать поток значений [0,1,2,3, ..].

Когда открывается новое соединение с веб-сокетом, клиент получит поток данных, начиная со значения, созданного после того, как соединение открыт (так что если соединение начинается после того, как значение 3 произведено B, то клиент получит значения начиная с 4 и далее).

Вот подвох.

Единственный способ, с помощью которого я нашел приемную часть канала в C, для получения значений асинхронно (и, следовательно, не позволяя ей буферизовать значения и отправлять их в c только тогда, когда B полностью завершен), это использовать Я считаю, что oop потребляет 100% ресурсов ЦП.

Я заметил, что из-за этого каждое соединение с веб-сокетом будет потреблять 100% ресурсов ЦП (поэтому при наличии двух открытых соединений загрузка ЦП будет составлять 200% и и так далее.

Вот l oop:

loop {
    while let Ok(v) = rx.try_recv() {
       println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
       println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
       let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}

Если я использую recv() (вместо try_recv()), значения будут буферизованы и переданы в bsocket только когда B сделано.

Я пытался использовать futures_channel::unbounded вместо tokio канала, но у меня та же проблема с буфером.

ВОПРОС: как переписать вышеизложенное l oop, чтобы избежать использования 100% и потоковых значений в веб-сокет без блокировки?

Вы можете увидеть сервер Tokio здесь: https://github.com/ceikit/async_data/blob/master/src/bin/tokio_server.rs

вы можете проверить его вращая соединение через веб-сокет в другом окне терминала с запущенным клиентом

1 Ответ

0 голосов
/ 23 апреля 2020

необходимо изменить thread::sleep для использования futures-timer и sync::Mutex на futures::lock::Mutex, тогда while-let с recv() отлично работает

...