Java: многопоточность и программирование сокетов UDP - PullRequest
4 голосов
/ 22 апреля 2010

Я новичок в многопоточности и программировании сокетов в Java. Я хотел бы знать, как лучше всего реализовать 2 потока - один для получения сокета и один для отправки сокета. Если то, что я пытаюсь сделать, звучит абсурдно, пожалуйста, дайте мне знать, почему! Код в значительной степени вдохновлен учебниками Sun. Я хочу использовать многоадресные сокеты, чтобы я мог работать с многоадресной группой.

class Server extends Thread
{

    static protected MulticastSocket socket = null;
    protected BufferedReader in = null;
    public InetAddress group;

    private static class Receive implements Runnable
    {

        public void run()
        {
            try
            {
                byte[] buf = new byte[256];
                DatagramPacket pkt = new DatagramPacket(buf,buf.length);
                socket.receive(pkt);
                String received = new String(pkt.getData(),0,pkt.getLength());
                System.out.println("From server@" + received);          
                Thread.sleep(1000);
            }
            catch (IOException e)
            { 
                System.out.println("Error:"+e);
            }   
            catch (InterruptedException e)
            { 
                System.out.println("Error:"+e);
            }   

        }

    }


    public Server() throws IOException
    {
        super("server");
        socket = new MulticastSocket(4446);
        group = InetAddress.getByName("239.231.12.3");
        socket.joinGroup(group);
    }

    public void run()
    {

        while(1>0)
        {   
            try
            {
                byte[] buf = new byte[256];
                DatagramPacket pkt = new DatagramPacket(buf,buf.length);        
                //String msg = reader.readLine();
                String pid = ManagementFactory.getRuntimeMXBean().getName();
                buf = pid.getBytes();
                pkt = new DatagramPacket(buf,buf.length,group,4446);
                socket.send(pkt);
                Thread t = new Thread(new Receive());
                t.start();

                while(t.isAlive())
                { 
                    t.join(1000);
                }
                sleep(1);
            }
            catch (IOException e)
            { 
                System.out.println("Error:"+e);
            }   
            catch (InterruptedException e)
            { 
                System.out.println("Error:"+e);
            }   

        }
        //socket.close();
    }

    public static void main(String[] args) throws IOException
    {
        new Server().start();
        //System.out.println("Hello");
    }

}

Ответы [ 4 ]

9 голосов
/ 22 апреля 2010

Во-первых: ваши занятия должны начинаться с заглавной буквы в соответствии с Соглашениями об именах Java :

Имена классов должны быть существительными в смешанном регистре с первой буквой каждое внутреннее слово пишется с заглавной буквы. Пробовать оставьте ваши имена классов простыми и описательный. Используйте целые слова - избегайте сокращения и аббревиатуры (если аббревиатура гораздо более широко используется чем длинная форма, такая как URL или HTML).

Второе: Попробуйте разбить код на последовательные разделы и организовать их вокруг некоторой общей функции, с которой вы имеете дело ... возможно, вокруг функциональности или модели, которую вы программируете.

(базовая) модель сервера заключается в том, что only делает только то, что получает соединения с сокетом ... сервер использует обработчик для обработки этих соединений, и все. Если вы попытаетесь построить эту модель, она будет выглядеть примерно так:

class Server{
    private final ServerSocket serverSocket;
    private final ExecutorService pool;

    public Server(int port, int poolSize) throws IOException {
      serverSocket = new ServerSocket(port);
      pool = Executors.newFixedThreadPool(poolSize);
    }

    public void serve() {
      try {
        while(true) {
          pool.execute(new Handler(serverSocket.accept()));
        }
      } catch (IOException ex) {
        pool.shutdown();
      }
    }
  }

  class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
      // receive the datagram packets
    }
 }

Третье: Я бы порекомендовал вам взглянуть на некоторые существующие примеры.

Обновлено за комментарии:
ОК, Рави, есть некоторые большие проблемы с вашим кодом и некоторые незначительные проблемы с ним:

  1. Я предполагаю, что класс Receive является вашим клиентом ... вы должны извлечь его как отдельную программу (со своим собственным основным классом) и одновременно запустить сервер и несколько клиентов. Создание нового «клиентского потока» с вашего сервера для каждого нового отправляемого вами UDP-пакета вызывает беспокойство (проблема big ).

  2. Когда вы создаете свое клиентское приложение, вы должны заставить его запускать принимающий код в своем собственном цикле while ( незначительный выпуск), например:

    public class Client extends Thread
    {
        public Client(/*..*/)
        {
            // initialize your client
        }
    
        public void run()
        {
            while(true)
            {
                // receive UDP packets
                // process the UDP packets
            }
        }
    
        public static void main(String[] args) throws IOException
        {
            // start your client
            new Client().start();
        }
    }
    
  3. Вам может понадобиться только один поток на клиента и один поток на сервер (технически у вас даже нет отдельного потока, поскольку у main есть собственный поток), поэтому вы можете не найти ExecutorService, который полезно.

В противном случае ваш подход верен ... но я бы все же рекомендовал бы вам проверить некоторые примеры.

2 голосов
/ 22 апреля 2010

Желание создавать темы в приложении не абсурдно! Вам не понадобятся ровно 2 потока, но я думаю, что вы говорите о 2 классах, которые реализуют интерфейс Runnable.

API потоков стал лучше с Java 1.5, и вам больше не нужно связываться с java.lang.Thread. Вы можете просто создать java.util.concurrent.Executor и отправить ему Runnable экземпляры.

Книга Параллелизм Java на практике использует именно эту проблему - создание сервера с многопоточными сокетами - и проходит несколько итераций кода, чтобы показать лучший способ сделать это. Проверьте главу бесплатного образца, которая великолепна. Я не буду копировать / вставлять код здесь, но посмотрите конкретно на листинг 6.8.

1 голос
/ 23 июня 2011

Хорошо, что история Eclipse работает даже целый день назад. Благодаря этому я могу привести и Рави, и Лирик, его ответ на утечку.

Позвольте мне сначала начать сзаявив, что я понятия не имею, что вызывает эту утечку, но если я оставлю ее достаточно долго, она не будет работать на OutOfMemoryError .

Во-вторых, я оставил рабочий код закомментированным для Raviдля рабочего базового примера моего сервера UDP.Время ожидания истекло, чтобы проверить, как долго мой брандмауэр убьет конец получателей (30 секунд).Просто удалите что-нибудь с пулом, и все готово.

Итак, рабочая, но утечка версии моего примера многопоточного UDP-сервера.

public class TestServer {

private static Integer TIMEOUT = 30;
private final static int MAX_BUFFER_SIZE = 8192;
private final static int MAX_LISTENER_THREADS = 5;
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ");

private int mPort;
private DatagramSocket mSocket;

// You can remove this for a working version
private ExecutorService mPool;

public TestServer(int port) {
    mPort = port;
    try {
        mSocket = new DatagramSocket(mPort);
        mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE);
        mSocket.setSendBufferSize(MAX_BUFFER_SIZE);
        mSocket.setSoTimeout(0);

        // You can uncomment this for a working version
        //for (int i = 0; i < MAX_LISTENER_THREADS; i++) {
        //  new Thread(new Listener(mSocket)).start();
        //}

        // You can remove this for a working version
        mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS);

    } catch (IOException e) {
        e.printStackTrace();
    }
}

// You can remove this for a working version
public void start() {
    try {
        try {
            while (true) {
                mPool.execute(new Listener(mSocket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    } finally {
        mPool.shutdown();
    }
}

private class Listener implements Runnable {

    private final DatagramSocket socket;

    public Listener(DatagramSocket serverSocket) {
        socket = serverSocket;
    }

    private String readLn(DatagramPacket packet) throws IOException {
        socket.receive(packet);
        return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine();
    }

    private void writeLn(DatagramPacket packet, String string) throws IOException {
        packet.setData(string.concat("\r\n").getBytes());
        socket.send(packet);
    }

    @Override
    public void run() {
        DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
        String s;
        while (true) {
            try {
                packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
                s = readLn(packet);
                System.out.println(DateFormat.format(new Date()) + " Received: " + s);
                Thread.sleep(TIMEOUT * 1000);
                writeLn(packet, s);
                System.out.println(DateFormat.format(new Date()) + " Sent: " + s);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) {
    if (args.length == 1) {
        try {
            TIMEOUT = Integer.parseInt(args[0]);
        } catch (Exception e) {
            TIMEOUT = 30;
        }
    }
    System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT);
    //new TestServer(4444);
    new TestServer(4444).start();
}
}

btw.@Lirik, я впервые увидел это в Eclipse, после чего протестировал его из командной строки.И опять же, я понятия не имею, что вызывает это;) извините ...

0 голосов
/ 24 апреля 2010

2 темы в порядке. Один читатель, другой писатель. Помните, что с UDP вы не должны создавать новые потоки обработчика (если то, что вы делаете, занимает много времени), я рекомендую бросать входящие сообщения в очередь обработки. То же самое для отправки, есть поток отправки, который блокирует входящую очередь для отправки UDP.

...