Java: как общаться с несколькими клиентами в одном потоке - PullRequest
3 голосов
/ 03 марта 2012

У меня есть список сокетов, принадлежащих одному потоку.Но мне интересно, есть ли реальный способ общения (чтения / записи от / до) с этими клиентами?Я не хочу создавать один поток для каждого клиента, потому что пользователей может быть слишком много, а создание одного потока для каждого из них может быть слишком дорогостоящим.

Ответы [ 5 ]

4 голосов
/ 03 марта 2012

Я бы сказал, что NIO - ваш лучший выбор здесь. Ознакомьтесь с одним из многих хороших руководств по NIO-коммуникациям через сокет (ну, SocketChannel)!

Я считаю, что это учебник, который я использовал, когда мне нужно было изучить NIO: http://rox -xmlrpc.sourceforge.net / niotut /

3 голосов
/ 03 марта 2012

Просто используйте стандартный Java NIO Лучшая документация написана на главной странице Java http://docs.oracle.com/javase/6/docs/technotes/guides/io/index.html. Есть API документация, образцы, учебник. Все. Я обещаю вам, что это достаточно - у меня есть опыт написания программного обеспечения, где 10k клиентов были подключены к одному клиенту (несколько потоков). Вы должны помнить только об ограничении ОС, чтобы изменить его в конфигурации.

3 голосов
/ 03 марта 2012

Netty - платформа сокетов клиентского сервера Java NIO http://www.jboss.org/netty

1 голос
/ 11 октября 2014

Мне пришлось подключить несколько IP-адресов серверов: ПОРТЫ и сделать обмен запросами-ответами. После реализации традиционного ввода-вывода с несколькими потоками и сторожевым таймером блокированные сокеты сдавались. Я реализовал NIO, и это мое тестовое приложение для дальнейшего использования.

Я могу открыть N соединений с тайм-аутом, прочитать ответ с тайм-аутом, написать команду с тайм-аутом все в простом однопоточном «игровом цикле». Если бы мне был нужен параллелизм, я мог бы создавать рабочие потоки, но это не обязательно, если логике приложения это не нужно.

Сервер - это пользовательское приложение telnet, клиенты пишут команды и читают текстовые строки, пока не будет найден запрос строки терминатора. Терминатор отмечает end_of_response_packet.

import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class ClientSocketNIO {
    private String host;
    private int port;
    private String charset;
    private ByteArrayOutputStream inBuffer;
    private ByteBuffer buf;
    private Selector selector;
    private SocketChannel channel;

    public ClientSocketNIO(String host, int port, String charset) {
        this.charset = charset==null || charset.equals("") ? "UTF-8" : charset;
        this.host = host;
        this.port = port;
    }

    public void open(long timeout) throws IOException {
        selector = Selector.open();
        channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_CONNECT);
        channel.connect(new InetSocketAddress(host, port));
        inBuffer = new ByteArrayOutputStream(1024);
        buf = ByteBuffer.allocate(1*1024);
        long sleep = Math.min(timeout, 1000);
        while(timeout > 0) {
            if (selector.select(sleep) < 1) {
                timeout-=sleep;
                continue;
            }
            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
            while(keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();
                if (!key.isValid() || !key.isConnectable()) continue;
                SocketChannel channel = (SocketChannel)key.channel();
                if (channel.isConnectionPending()) {
                    channel.finishConnect();
                    channel.configureBlocking(false);
                    return; // we are ready to receive bytes
                }
            }
        }
        throw new IOException("Connection timed out");
    }

    public void close() {
        try { channel.close(); } catch(Exception ex) { }
        try { selector.close(); } catch(Exception ex) { }
        inBuffer=null;
        buf=null;
    }   

    public List<String> readUntil(String terminator, long timeout, boolean trimLines) throws IOException {
        return readUntil(new String[]{terminator}, timeout, trimLines);
    }

    public List<String> readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException {
        List<String> lines = new ArrayList<String>(12);
        inBuffer.reset();

        // End of packet terminator strings, line startsWith "aabbcc" string.
        byte[][] arrTerminators = new byte[terminators.length][];
        int[] idxTerminators = new int[terminators.length];
        for(int idx=0; idx < terminators.length; idx++) {
            arrTerminators[idx] = terminators[idx].getBytes(charset);
            idxTerminators[idx] = 0;
        }
        int idxLineByte=-1;

        channel.register(selector, SelectionKey.OP_READ);
        long sleep = Math.min(timeout, 1000);
        while(timeout>0) {
            if (selector.select(sleep) < 1) {
                timeout-=sleep;
                continue;
            }
            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
            while(keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();
                if (!key.isValid() || !key.isReadable()) continue;
                SocketChannel channel = (SocketChannel)key.channel();
                buf.clear();
                int len = channel.read(buf);
                System.out.println("read " + len);
                if (len == -1) throw new IOException("Socket disconnected");
                buf.flip();
                for(int idx=0; idx<len; idx++) {
                    byte cb = buf.get(idx);
                    if (cb!='\n') {
                        idxLineByte++;
                        inBuffer.write(cb);
                        for(int idxter=0; idxter < arrTerminators.length; idxter++) {
                            byte[] arrTerminator = arrTerminators[idxter];
                            if (idxLineByte==idxTerminators[idxter]
                                    && arrTerminator[ idxTerminators[idxter] ]==cb) {
                                idxTerminators[idxter]++;
                                if (idxTerminators[idxter]==arrTerminator.length)
                                    return lines;
                            } else idxTerminators[idxter]=0;
                        }
                    } else  {
                        String line = inBuffer.toString(charset);
                        lines.add(trimLines ? line.trim() : line);
                        inBuffer.reset();
                        idxLineByte=-1;
                        for(int idxter=0; idxter<arrTerminators.length; idxter++)
                            idxTerminators[idxter]=0;
                    }
                }
            }
        }
        throw new IOException("Read timed out");
    }

    public void write(String data, long timeout) throws IOException {
        ByteBuffer outBuffer = ByteBuffer.wrap(data.getBytes(charset));
        channel.register(selector, SelectionKey.OP_WRITE);
        long sleep = Math.min(timeout, 1000);
        while(timeout > 0) {
            if (selector.select(sleep) < 1) {
                timeout-=sleep;
                continue;
            }
            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
            while(keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();
                if (!key.isValid() || !key.isWritable()) continue;
                SocketChannel channel = (SocketChannel)key.channel();
                int len = channel.write(outBuffer);
                System.out.println("write " + len);
                if (outBuffer.remaining()<1)
                    return;
            }
        }
        throw new IOException("Write timed out");
    }

    public static void main(String[] args) throws Exception {
        ClientSocketNIO client = new ClientSocketNIO("11.22.33.44", 1234, "UTF-8");
        try {
            client.open(15000);

            // read prompting for username
            List<String> reply = client.readUntil("User: ", 15000, true);
            for(int idx=0; idx<reply.size(); idx++)
                System.out.println("|"+reply.get(idx)+"|");         

            // write username and read a success or failed prompt(asks username once again),
            // this one may return two different terminator prompts so listen for both
            client.write("myloginname\n", 15000);
            reply = client.readUntil(new String[]{"> ", "User: "}, 15000, true);
            for(int idx=0; idx<reply.size(); idx++)
                System.out.println("|"+reply.get(idx)+"|");
            if (!reply.get(reply.size()-1).startsWith("Welcome ")) return; // Access denied

            System.out.println("-----");
            client.write("help\n", 15000);
            reply = client.readUntil("> ", 15000, true);
            for(int idx=0; idx<reply.size(); idx++)
                System.out.println("|"+reply.get(idx)+"|");

            System.out.println("-----");
            client.write("get status\n", 15000);
            reply = client.readUntil("> ", 15000, true);
            for(int idx=0; idx<reply.size(); idx++)
                System.out.println("|"+reply.get(idx)+"|");

            System.out.println("-----");
            client.write("get list\n", 15000);
            reply = client.readUntil("> ", 15000, true);
            for(int idx=0; idx<reply.size(); idx++)
                System.out.println("|"+reply.get(idx)+"|");

            client.write("quit\n", 15000);
        } finally {
            client.close();
        }
    }

}
1 голос
/ 03 марта 2012

вы можете использовать подход NIO в JRE. Другое решение использует Space Architecture. В этой архитектуре существуют глобальные пространства с именем Space и любой запрос на запись в эти пространства, затем другие потоки читают из этих пространств и обрабатывают его и записывают результат процесса в другое пространство, а в последнем шаге запроса потока читают собственный результат из указанного пространства.

Вы можете увидеть следующую ссылку для получения дополнительной информации:

http://en.wikipedia.org/wiki/Space_architecture

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...