Как я могу поделиться или избежать совместного использования ресурса websocket между двумя потоками? - PullRequest
0 голосов
/ 02 марта 2020

Я использую вольфрамит для создания сервера чата, и способ, которым я хочу это сделать, основан на том, чтобы множество потоков связывались друг с другом через mps c. Я хочу создать новый поток для каждого пользователя, который подключается к серверу и подключает его к веб-сокету, а также должен иметь возможность чтения этого потока с mps c, чтобы сервер мог отправлять сообщения через это подключение.

Проблема в том, что чтение mps c блокирует поток, но я не могу заблокировать поток, если я хочу читать из него. Единственное, что я мог придумать, чтобы обойти это, - это создать два потока, один для входящих и один для исходящих сообщений, но для этого мне нужно поделиться своим подключением через веб-сокет с обоими работниками, что, конечно, я не могу сделать.

Вот сильно урезанная версия моего кода, где я пытаюсь сделать двух рабочих в ответвлении Action :: Connect оператора match, что дает error[E0382]: use of moved value: 'websocket' для попытки переместить его в закрытие второго рабочего:

extern crate tungstenite;
extern crate workerpool;

use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{self, Sender, Receiver};
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use tungstenite::server::accept;

pub enum Action {
  Connect(TcpStream),
  Send(String),
}

fn main() {
  let (main_send, main_receive): (Sender<Action>, Receiver<Action>) = mpsc::channel();
  let worker_pool = Pool::<ThunkWorker<()>>::new(8);
  {
    // spawn thread to listen for users connecting to the server
    let main_send = main_send.clone();
    worker_pool.execute(Thunk::of(move || {
      let listener = TcpListener::bind(format!("127.0.0.1:{}", 8080)).unwrap();
      for (_, stream) in listener.incoming().enumerate() {
        main_send.send(Action::Connect(stream.unwrap())).unwrap();
      }
    }));
  }

  let mut users: Vec<Sender<String>> = Vec::new();

  // process actions from children
  while let Some(act) = main_receive.recv().ok() {
    match act {
      Action::Connect(stream) => {
        let mut websocket = accept(stream).unwrap();
        let (user_send, user_receive): (Sender<String>, Receiver<String>) = mpsc::channel();
        let main_send = main_send.clone();

        // thread to read user input and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          loop {
            let message = websocket.read_message().unwrap().to_string();
            main_send.send(Action::Send(message)).unwrap();
          }
        }));

        // thread to take server output and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          while let Some(message) = user_receive.recv().ok() {
            websocket.write_message(tungstenite::Message::Text(message.clone())).unwrap();
          }
        }));
        users.push(user_send);
      }
      Action::Send(message) => {
        // take user message and echo to all users
        for user in &users {
          user.send(message.clone()).unwrap();
        }
      }
    }
  }
}

Если я создаю только один поток как для входа, так и для вывода в этом рукаве, то user_receive.recv () блокирует поток, поэтому я не могу читать сообщения с помощью websocket.read_message () до тех пор, пока не получу mps c сообщение из основного потока. Как я могу решить обе проблемы? Я подумывал о клонировании веб-сокета, но он не реализует Clone, и я не знаю, является ли разумным попытаться сделать новое соединение с тем же потоком, это кажется хакерским.

1 Ответ

1 голос
/ 03 марта 2020

Проблема в том, что чтение mps c блокирует поток

. Вы можете использовать try_recv , чтобы избежать блокировки потока. Другая реализация mps c - это crossbeam_channel . Этот проект рекомендуется замена даже автором mps c

Я хочу создать новый поток для каждого пользователя, который подключается к серверу

Я думаю, что подход asyn/await будет намного лучше с большинства перспектив, чем thread per client. Вы можете прочитать больше об этом там

...