Использование потоков для обработки сокетов - PullRequest
1 голос
/ 13 мая 2010

Я работаю над Java-программой, которая по сути является чатом. Это задание для класса, поэтому без кода, пожалуйста, у меня просто есть некоторые проблемы, определяющие наиболее реальный способ справиться с тем, что мне нужно сделать. У меня есть серверная программа, уже настроенная для одного клиента, использующего потоки для получения потока ввода данных и поток для обработки отправки в потоке вывода данных. Теперь мне нужно создать новую ветку для каждого входящего запроса.

Моя мысль - создать связанный список, содержащий либо клиентские сокеты, либо, возможно, поток. Где я спотыкаюсь, так это выясняю, как справиться с отправкой сообщений всем клиентам. Если у меня есть поток для каждого входящего сообщения, как я могу потом развернуться и отправить его в каждый клиентский сокет.

Я думаю, что если бы у меня был связанный список клиентских пакетов, я мог бы затем просмотреть список и отправить его каждому, но тогда мне пришлось бы каждый раз создавать dataoutputstream. Могу ли я создать связанный список dataoutputstreams? Извините, если это звучит так, будто я болтаю, но я не хочу просто начинать кодировать это, это может стать грязным без хорошего плана. Спасибо!

EDIT Я решил опубликовать код, который у меня есть. У меня еще не было возможности протестировать его, поэтому любые комментарии были бы отличными. Спасибо!

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class prog4_server {

    // A Queue of Strings used to hold out bound Messages
    // It blocks till on is available
    static BlockingQueue<String> outboundMessages = new LinkedBlockingQueue<String>();

    // A linked list of data output streams
    // to all the clients
    static LinkedList<DataOutputStream> outputstreams;

    // public variables to track the number of clients
    // and the state of the server
    static Boolean serverstate = true;
    static int clients = 0;

    public static void main(String[] args) throws IOException{

        //create a server socket and a clientSocket
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(6789);
        } catch (IOException e) {
            System.out.println("Could not listen on port: 6789");
            System.exit(-1);
        }// try{...}catch(IOException e){...}

        Socket clientSocket;

        // start the output thread which waits for elements
        // in the message queue
        OutputThread out = new OutputThread();
        out.start();

        while(serverstate){

            try {

                // wait and accept a new client
                // pass the socket to a new Input Thread
                clientSocket = serverSocket.accept();
                DataOutputStream ServerOut = new DataOutputStream(clientSocket.getOutputStream());
                InputThread in = new InputThread(clientSocket, clients);
                in.start();
                outputstreams.add(ServerOut);

            } catch (IOException e) {

                System.out.println("Accept failed: 6789");
                System.exit(-1);
            }// try{...}catch{..}

            // increment the number of clients and report
            clients = clients++;

            System.out.println("Client #" + clients + "Accepted");

        }//while(serverstate){...

    }//public static void main

    public static class OutputThread extends Thread {

        //OutputThread Class Constructor
        OutputThread() {
        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            while(!this.interrupted()) {

                try {

                    msg = outboundMessages.take();

                    for(int i=0;i<outputstreams.size();i++){

                        outputstreams.get(i).writeBytes(msg + '\n');

                    }// for(...){...

                 } catch (IOException e) {

                    System.out.println(e);

                 } catch (InterruptedException e){

                     System.out.println(e);

                 }//try{...}catch{...}

            }//while(...){

        }//public void run(){...

    }// public OutputThread(){...

    public static class InputThread extends Thread {

        Boolean threadstate = true;
        BufferedReader ServerIn;
        String user;
        int threadID;
        //SocketThread Class Constructor
        InputThread(Socket clientSocket, int ID) {

            threadID = ID;

            try{
                ServerIn = new BufferedReader(
                    new InputStreamReader(clientSocket.getInputStream()));
                    user = ServerIn.readLine();
            }
            catch(IOException e){
                System.out.println(e);
            }

        }// InputThread(...){...

        public void run() {

            String msg = null;

        while (threadstate) {

                try {

                    msg = ServerIn.readLine();

                    if(msg.equals("EXITEXIT")){

                        // if the client is exiting close the thread
                        // close the output stream with the same ID
                        // and decrement the number of clients
            threadstate = false;
                        outputstreams.get(threadID).close();
                        outputstreams.remove(threadID);
                        clients = clients--;
                        if(clients == 0){
                            // if the number of clients has dropped to zero
                            // close the server
                            serverstate = false;
                            ServerIn.close();
                        }// if(clients == 0){...
                    }else{

                        // add a message to the message queue
                        outboundMessages.add(user + ": " + msg);

                    }//if..else...

                } catch (IOException e) {

                    System.out.println(e);

                }// try { ... } catch { ...}

        }// while

        }// public void run() { ...
    }

    public static class ServerThread extends Thread {

        //public variable declaration
        BufferedReader UserIn =
                new BufferedReader(new InputStreamReader(System.in));

        //OutputThread Class Constructor
        ServerThread() {

        }//OutputThread(...){...

        public void run() {

            //string variable to contain the message
            String msg = null;

            try {

                //while loop will continue until
                //exit command is received
                //then send the exit command to all clients

                msg = UserIn.readLine();

                while (!msg.equals("EXITEXIT")) {

                    System.out.println("Enter Message: ");
                    msg = UserIn.readLine();

                }//while(...){

                outboundMessages.add(msg);
                serverstate = false;
                UserIn.close();

            } catch (IOException e) {
                System.out.println(e);

            }//try{...}catch{...}


        }//public void run(){...
    }// public serverThread(){...

}// public class prog4_server

1 Ответ

3 голосов
/ 13 мая 2010

Я решил эту проблему в прошлом, определив класс "MessageHandler" для каждого клиентского соединения, который отвечает за входящий / исходящий трафик сообщений. Внутренне обработчик использует реализацию BlockingQueue, в которую помещаются исходящие сообщения (внутренними рабочими потоками). Поток отправителя ввода / вывода постоянно пытается прочитать из очереди (при необходимости блокируя) и отправляет каждое сообщение, полученное клиенту.

Вот пример кода скелета (не проверен):

/**
 * Our Message definition.  A message is capable of writing itself to
 * a DataOutputStream.
 */
public interface Message {
  void writeTo(DataOutputStream daos) throws IOException;
}

/**
 * Handler definition.  The handler contains two threads: One for sending
 * and one for receiving messages.  It is initialised with an open socket.
 */    
public class MessageHandler {
  private final DataOutputStream daos;
  private final DataInputStream dais;
  private final Thread sender;
  private final Thread receiver;
  private final BlockingQueue<Message> outboundMessages = new LinkedBlockingQueue<Message>();

  public MessageHandler(Socket skt) throws IOException {
    this.daos = new DataOutputStream(skt.getOutputStream());
    this.dais = new DataInputStream(skt.getInputStream());

    // Create sender and receiver threads responsible for performing the I/O.
    this.sender = new Thread(new Runnable() {
      public void run() {
        while (!Thread.interrupted()) {
          Message msg = outboundMessages.take(); // Will block until a message is available.

          try {
            msg.writeTo(daos);
          } catch(IOException ex) {
            // TODO: Handle exception
          }
        }
      }
    }, String.format("SenderThread-%s", skt.getRemoteSocketAddress()));

    this.receiver = new Thread(new Runnable() {
      public void run() {
        // TODO: Read from DataInputStream and create inbound message.
      }
    }, String.format("ReceiverThread-%s", skt.getRemoteSocketAddress()));

    sender.start();
    receiver.start();
  }

  /**
   * Submits a message to the outbound queue, ready for sending.
   */
  public void sendOutboundMessage(Message msg) {
    outboundMessages.add(msg);
  }

  public void destroy() {
    // TODO: Interrupt and join with threads.  Close streams and socket.
  }
}

Обратите внимание, что Николай прав в том, что блокировка ввода-вывода с использованием 1 (или 2) потоков на соединение не является масштабируемым решением, и обычно приложения могут быть написаны с использованием Java NIO, чтобы обойти это. Однако на самом деле, если вы не пишете корпоративный сервер, к которому одновременно подключаются тысячи клиентов, это на самом деле не проблема. Написание безошибочных масштабируемых приложений с использованием Java NIO сложно и, конечно, я бы не рекомендовал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...