многопоточность с неблокирующими розетками - PullRequest
3 голосов
/ 12 октября 2011

Я пытаюсь реализовать TCP-сервер в Java, используя nio.Это просто с помощью метода выбора Selector, чтобы получить готовые ключи.А затем обрабатывают эти ключи, если они приемлемы, читаемы и так далее.Сервер работает нормально, пока я не использую один поток.Но когда я пытаюсь использовать больше потоков для обработки ключей, ответ сервера замедляется и в итоге перестает отвечать, скажем, после 4-5 запросов.Это все, что я делаю: (Pseudo)

Iterator<SelectionKey> keyIterator =  selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
                SelectionKey readyKey = keyIterator.next();
                if (readyKey.isAcceptable()) {
                    //A new connection attempt, registering socket channel with selector

                } else {
                    Worker.add( readyKey );
                }

Worker - это класс потока, который выполняет ввод / вывод из канала.Это код моего класса Worker:

private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList());

public static void add(SelectionKey key) {
    synchronized (keyPool) {
        keyPool.add(key);
        keyPool.notifyAll();
    }
}


public void run() {
    while ( true ) {

        SelectionKey myKey = null;
        synchronized (keyPool) {
            try {
                while (keyPool.isEmpty()) {
                    keyPool.wait();
                }
            } catch (InterruptedException ex) {                    
            }
            myKey = keyPool.remove(0);
            keyPool.notifyAll();
        }

        if (myKey != null && myKey.isValid() ) {

            if (myKey.isReadable()) {
                //Performing reading
            } else if (myKey.isWritable()) {
                //performing writing
                myKey.cancel();  
            }
        }
    }

Моя основная идея - добавить ключ к keyPool, из которого различные потоки могут получить ключ, по одному за раз.Мой класс BaseServer сам работает как поток.Он создает 10 рабочих потоков до начала цикла обработки событий.Я также попытался увеличить приоритет потока BaseServer, чтобы у него было больше шансов принять приемлемые ключи.Тем не менее, он перестает отвечать после примерно 8 запросов.Пожалуйста, помогите, если я ошибаюсь.Заранее спасибо.:)

Ответы [ 3 ]

2 голосов
/ 13 октября 2011

В-третьих, вы ничего не удаляете из набора выбранных ключей.Вы должны делать это каждый раз в цикле, например, вызывая keyIterator.remove () после вызова next ().

Вам необходимо прочитать учебные пособия NIO.

1 голос
/ 17 июля 2013

Попробуйте использовать библиотеку xsocket. Это сэкономило мне много времени на чтении на форумах.

Скачать: http://xsocket.org/

Учебник: http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm

Код сервера:

import org.xsocket.connection.*;

/**
 *
 * @author wsserver
 */
public class XServer {

    protected static IServer server;

    public static void main(String[] args) {
        try {
            server = new Server(9905, new XServerHandler());
            server.start();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
     protected static void shutdownServer(){
        try{
            server.close();
        }catch(Exception ex){
            System.out.println(ex.getMessage());
        }        
    }
}

Обработчик сервера:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;
import org.xsocket.*;
import org.xsocket.connection.*;

public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler {

    private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>());

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {
        try {
            synchronized (sessions) {
                sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress()));
            }
            System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort());
        } catch (Exception ex) {
            System.out.println("onConnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection inbc) throws IOException {
        try {
            synchronized (sessions) {
                sessions.remove(inbc);
            }
            System.out.println("onDisconnect");
        } catch (Exception ex) {
            System.out.println("onDisconnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println("request:"+request);
        buffer.clear();
        return true;
    }
}

Подключенные клиенты:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class ConnectedClients {

    private INonBlockingConnection inbc;
    private InetAddress address;

    //CONSTRUCTOR
    public ConnectedClients(INonBlockingConnection inbc, InetAddress address) {
        this.inbc = inbc;
        this.address = address;
    }

    //GETERS AND SETTERS
    public INonBlockingConnection getInbc() {
        return inbc;
    }

    public void setInbc(INonBlockingConnection inbc) {
        this.inbc = inbc;
    }

    public InetAddress getAddress() {
        return address;
    }

    public void setAddress(InetAddress address) {
        this.address = address;
    }
}

Код клиента:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
import org.xsocket.connection.NonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClient {

    protected static INonBlockingConnection inbc;
    public static void main(String[] args) {
        try {
            inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler());

            while(true){

            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
}

Клиентский обработчик:

import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.xsocket.MaxReadSizeExceededException;
import org.xsocket.connection.IConnectExceptionHandler;
import org.xsocket.connection.IConnectHandler;
import org.xsocket.connection.IDataHandler;
import org.xsocket.connection.IDisconnectHandler;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler {

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection nbc) throws IOException {
        System.out.println("Connected to server");
        nbc.write("hello server\r\n");
        return true;
    }

    @Override
    public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException {

        System.out.println("On connect exception:"+ioe.getMessage());
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {

        System.out.println("Dissconected from server");
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {

        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println(request);
        buffer.clear();
        return true;
    }
}

Я потратил много времени, читая об этом на форумах, надеюсь, я смогу помочь вам с моим кодом.

1 голос
/ 13 октября 2011

Прежде всего, вам больше не следует использовать вызовы wait () и notify (), поскольку в java.util.concurrent есть хорошие классы-оболочки Java (1.5+), такие как BlockingQueue .

Во-вторых, предлагается выполнять ввод-вывод в самом потоке выбора, а не в рабочих потоках.Рабочие потоки должны просто ставить в очередь операции чтения / записи в поток (ы) селектора.

Эта страница объясняет это довольно хорошо и даже предоставляет примеры рабочего кода простого сервера TCP / IP: http://rox -xmlrpc.sourceforge.net / niotut /

Извините, у меня еще нет времени взглянуть на ваш конкретный пример.

...