Я хотел изучить Rust и поэтому решил использовать его для реального проекта.
Идея состоит в том, чтобы иметь сервер, который
от основной поток A
порождает новый поток B
, который выполняет некоторую асинхронную задачу c, которая генерирует поток значений за время
получает клиентские соединения websocket [c, d, e, ..]
асинхронно и обрабатывает они одновременно порождают новые потоки [C, D, E, ...]
отправляет значения, полученные в потоке B, в потоки [C, D, E, ...]
каждый поток в [C, D, E, ...]
публикует значение для соответствующего клиента в [c, d, e, ..]
Я использую
tokio
для создания новых тем и tokio::sync::mpsc::unbounded_channel
для отправить значения, вычисленные в B
, в другие потоки
tokio_tungstenite
для управления подключениями через веб-сокет и отправки значений клиентам
I удалось получить рабочий пример, где поток B
производит целые числа и фиксированные временные интервалы. Когда сервер запускается, B
начинает генерировать поток значений [0,1,2,3, ..]
.
Когда открывается новое соединение с веб-сокетом, клиент получит поток данных, начиная со значения, созданного после того, как соединение открыт (так что если соединение начинается после того, как значение 3
произведено B
, то клиент получит значения начиная с 4
и далее).
Вот подвох.
Единственный способ, с помощью которого я нашел приемную часть канала в C
, для получения значений асинхронно (и, следовательно, не позволяя ей буферизовать значения и отправлять их в c
только тогда, когда B
полностью завершен), это использовать Я считаю, что oop потребляет 100% ресурсов ЦП.
Я заметил, что из-за этого каждое соединение с веб-сокетом будет потреблять 100% ресурсов ЦП (поэтому при наличии двух открытых соединений загрузка ЦП будет составлять 200% и и так далее.
Вот l oop:
loop {
while let Ok(v) = rx.try_recv() {
println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}
Если я использую recv()
(вместо try_recv()
), значения будут буферизованы и переданы в bsocket только когда B
сделано.
Я пытался использовать futures_channel::unbounded
вместо tokio
канала, но у меня та же проблема с буфером.
ВОПРОС: как переписать вышеизложенное l oop, чтобы избежать использования 100% и потоковых значений в веб-сокет без блокировки?
Вы можете увидеть сервер Tokio здесь: https://github.com/ceikit/async_data/blob/master/src/bin/tokio_server.rs
вы можете проверить его вращая соединение через веб-сокет в другом окне терминала с запущенным клиентом