Java NIO Framework перестает работать под большой нагрузкой без записи - PullRequest
0 голосов
/ 17 сентября 2010

Проблема довольно странная для меня, хотя я новичок.

То, что происходит, заключается в том, что, если вы заставляете сервер работать под большой нагрузкой и продолжаете отправлять недопустимый пакет, который не представляет пакет POLICY_XML.

Практически то, что я пытаюсь сказатьв том, что если вы подключите его, он перейдет в сокет READ OPERATION.Тогда вы никогда не заходите в send (), которая меняет SelectionKey на WRITE OPERATION.Каким-то образом операции чтения складываются, и после 2000 или около того запросов на соединение сервер перестанет принимать соединения, несмотря ни на что.Я пытался соединиться с telnet и всегда не мог установить соединение. Но примерно через 5 минут он снова начинает принимать соединения и становится полностью функциональным.

Очень странная проблема, но учтите, что если я удаляю соответствующий пакетУтверждение, что он будет действовать аналогично эхо-серверу.Тогда он будет работать бесконечно, не испытывая проблем с подключением, в значительной степени он становится стабильным.

Я прикрепил весь исходный код сервера ниже.Может кто-то, кто обладает обширными знаниями в NIO, пожалуйста, проверьте его и дайте мне знать, если есть способ это исправить.

Все, что действительно бросается в глаза, это пробуждение селектора в send (), которое может исправить все послепомещая строку в read (), она, похоже, абсолютно ничего не делает, и проблема остается.

// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();

Вот источник простого сервера.

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class PolicyServer implements Runnable {
    public static final String POLICY_REQUEST = "<policy-file-request/>";
    public static final String POLICY_XML =
        "<?xml version=\"1.0\"?>"
        + "<cross-domain-policy>"
        + "<allow-access-from domain=\"*\" to-ports=\"*\" />"
        + "</cross-domain-policy>"
        + (char)0;


    // The host:port combination to listen on
    private InetAddress hostAddress;
    private int port;

    // The channel on which we'll accept connections
    private ServerSocketChannel serverChannel;

    // The selector we'll be monitoring
    private Selector selector;

    // The buffer into which we'll read data when it's available
    private ByteBuffer readBuffer = ByteBuffer.allocate(255);

    // This decodes raw bytes into ascii data.
    private CharsetDecoder asciiDecoder;

    // A list of PendingChange instances
    private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();

    // Maps a SocketChannel to a list of ByteBuffer instances
    private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();

    public PolicyServer(InetAddress hostAddress, int port) throws IOException {
        this.hostAddress = hostAddress;
        this.port = port;
        this.selector = this.initSelector();
        this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput(
                                CodingErrorAction.REPLACE).onUnmappableCharacter(
                                CodingErrorAction.REPLACE);
    }

    public void send(SocketChannel socket, byte[] data) {
        synchronized (this.pendingChanges) {
            // Indicate we want the interest ops set changed
            this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

            // And queue the data we want written
            synchronized (this.pendingData) {
                List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
                if (queue == null) {
                    queue = new ArrayList<ByteBuffer>();
                    this.pendingData.put(socket, queue);
                }
                queue.add(ByteBuffer.wrap(data));
            }
        }

        // Finally, wake up our selecting thread so it can make the required changes
        this.selector.wakeup();
    }

    public void run() {
        while (true) {
            try {
                // Process any pending changes
                synchronized (this.pendingChanges) {
                    Iterator changes = this.pendingChanges.iterator();
                    while (changes.hasNext()) {
                        ChangeRequest change = (ChangeRequest) changes.next();
                        changes.remove();
                        if(change == null) continue;
                        switch (change.type) {
                        case ChangeRequest.CHANGEOPS:
                            SelectionKey key = change.socket.keyFor(this.selector);
                            try {
                                if(key!=null)
                                    key.interestOps(change.ops);
                            } catch(Exception ex) {
                                if (key!=null)
                                    key.cancel();
                            }
                        }
                    }
                    this.pendingChanges.clear();
                }

                // Wait for an event one of the registered channels
                this.selector.select();

                // Iterate over the set of keys for which events are available
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();

                    if (!key.isValid()) {
                        key.cancel();
                        continue;
                    }

                    // Check what event is available and deal with it
                    try {
                        if (key.isAcceptable()) {
                            this.accept(key);
                        } else if (key.isReadable()) {
                            this.read(key);
                        } else if (key.isWritable()) {
                            this.write(key);
                        }
                    } catch(IOException io) {
                        this.pendingData.remove(key.channel());
                        try {
                            ((SocketChannel)key.channel()).socket().close();
                        } catch (IOException e) {}
                        key.channel().close();
                        key.cancel();
                        key.attach(null);
                        key = null;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        // also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets)
        socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer());
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead = socketChannel.read(this.readBuffer);

        if (numRead == -1) {
            // Remote entity shut the socket down cleanly. Do the
            // same from our end and cancel the channel.
            throw new IOException("");
        }

        // Grab the StringBuffer we stored as the attachment
        StringBuffer sb = (StringBuffer)key.attachment();

        // Flips the readBuffer by setting the current position of
        // packet stream to beginning.
        // Append the data to the attachment StringBuffer
        this.readBuffer.flip();
        sb.append(this.asciiDecoder.decode(this.readBuffer).toString());
        this.readBuffer.clear();

        // Get the policy request as complete packet
        if(sb.indexOf("\0") != -1) {
            String packets = new String(sb.substring(0, sb.lastIndexOf("\0")+1));
            sb.delete(0, sb.lastIndexOf("\0")+1);
            if(packets.indexOf(POLICY_REQUEST) != -1)
                send(socketChannel, POLICY_XML.getBytes());
        } else if(sb.length() >  8192) {
            sb.setLength(0);
            //Force disconnect.
            throw new IOException("");
        }
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        synchronized (this.pendingData) {
            List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

            if(queue == null || queue.isEmpty()) {
                // We wrote away all data, so we're no longer interested
                // in writing on this socket. Switch back to waiting for
                // data.
                try {
                    if (key!=null)
                        key.interestOps(SelectionKey.OP_READ);
                } catch(Exception ex) {
                    if (key!=null)
                        key.cancel();
                }
            }

            // Write until there's not more data ...
            while (!queue.isEmpty()) {
                ByteBuffer buf = (ByteBuffer) queue.get(0);
                socketChannel.write(buf);
                if (buf.remaining() > 0) {
                    // ... or the socket's buffer fills up
                    break;
                }
                queue.remove(0);
            }
        }
    }

    private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
    }

    public static void main(String[] args) {
        try {
            new Thread(new PolicyServer(null, 5556)).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


import java.nio.channels.SocketChannel;

public class ChangeRequest {
    public static final int CHANGEOPS = 1;

    public SocketChannel socket;
    public int type;
    public int ops;

    public ChangeRequest(SocketChannel socket, int type, int ops) {
        this.socket = socket;
        this.type = type;
        this.ops = ops;
    }
}

Ответы [ 3 ]

1 голос
/ 29 октября 2013

Для больших нагрузок, пожалуйста, убедитесь, что вы используете платформу, которая делает пул соединений.Это уменьшит количество открытых сокетов и будет держать все под контролем.

Проверьте sfnrpc (http://code.google.com/p/sfnrpc), который использует пул общих ресурсов и по умолчанию объединяет 20 соединений.

0 голосов
/ 29 октября 2010

((SocketChannel) key.channel ()). Гнездо (). Закрыть ()

Тебе все это не нужно. Измените это на:

key.channel().close()

send (), который меняет SelectionKey на НАПИСАТЬ ОПЕРАЦИЮ

Я бы хотел увидеть детали этого. Скорее всего, вы никогда не получите из состояния OP_WRITE.

0 голосов
/ 17 сентября 2010

Отредактировано после прочтения комментариев:

После прочтения ваших комментариев я думаю, что количество открытых в данный момент сокетов, похоже, является проблемой.открыть сокет и держать его открытым, даже когда он простаивает, безусловно, выглядит как проблема.Каждый сокет требует системных ресурсов, таких как inode.

Попробуйте запустить ulimit, если вы используете Linux. ПРИМЕЧАНИЕ. Не рекомендуется устанавливать его в качестве неограниченного, но вы можете попробовать это:

 ulimit -u unlimited
...