Java NIO Проблема / Непонимание того, как работает isReadable - PullRequest
0 голосов
/ 08 июня 2011

Я обнаружил, что NIO в лучшем случае плохо документировано, за исключением упрощенного случая. Несмотря на это, я прошел через учебные пособия и несколько рефакторингов и в конечном итоге отодвинулся к простейшему случаю, и я до сих пор иногда запускаю isReadable с 0-байтовым чтением SocketChannel. Это происходит не при каждом исполнении.

Раньше я вызывал чтение из присоединенного объекта в отдельном потоке и думал, что это могут быть условия гонки, но с тех пор я прочитал, что чтение происходит в потоке селектора, и проблема все еще сохраняется. Я предполагаю, что это может быть мой тестовый клиент, но я не уверен, что вызвало бы его непоследовательным образом, поскольку сокет клиента не должен закрываться, пока не получит ответ от сервера.

Таким образом, во включенном коде сообщение "привет", отправленное этим фрагментом, каждый раз проходит нормально, как я и ожидал

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();

Именно после этого я иногда получаю канал сокета 0 длины. И иногда получить правильный ответ из этого фрагмента:

        out.write(dataServerCredentials.getBytes());
        out.write(EOT);
        out.flush();

Любое понимание этого будет оценено, оно убивает меня медленно. Я уже пытался найти ответы здесь, и один вопрос, который показался мне актуальным, не особо пролил свет на мои проблемы.

Заранее спасибо!

Фрагменты кода ниже:

Метод выбора:

public void execute()
{
    initializeServerSocket();

    for (;;)
    {
        try
        {
            System.out.println("Waiting for socket activity");

            selector.select();

            Iterator<SelectionKey> selectedKeys = 
                this.selector.selectedKeys().iterator();
            while(selectedKeys.hasNext())
            {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) 
                {
                    continue;
                }

                if (key.isAcceptable())
                {   // New connection
                    // TODO: Create helper method for this that configures user info?
                    System.out.println("Accepting connection");

                    ServerSocketChannel serverSocketChannel =
                        (ServerSocketChannel)key.channel();
                    SocketChannel socketChannel =
                        serverSocketChannel.accept();

                    socketChannel.socket().setSoTimeout(0);
                    socketChannel.configureBlocking(false);
                    SelectionKey newKey = 
                        socketChannel.register(selector, SelectionKey.OP_READ);

                    // Create and attach an AuthProcessor to drive the states of this
                    // new Authentication request
                    newKey.attach(new AuthenticationRequestProcessor(newKey));

                }
                else if (key.isReadable())
                {   // Socket has incoming communication
                    AuthenticationRequestProcessor authProcessor =
                        (AuthenticationRequestProcessor)key.attachment();

                    if (authProcessor == null)
                    {   // Cancel Key
                        key.channel().close();
                        key.cancel();
                        System.err.print("Cancelling Key - No Attachment");
                    }
                    else
                    {   
                        if (authProcessor.getState() ==
                            AuthenticationRequestProcessor.TERMINATE_STATE)
                        {   // Cancel Key
                            key.channel().close();
                            key.cancel();
                        }
                        else
                        {   // Process new socket data
                            authProcessor.process(readStringFromKey(key));
                        }
                    }
                }                    
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Метод чтения (игнорируйте некоторые глупости здесь, это было выдернуто из другого потока)

protected String readStringFromKey(SelectionKey key)
{
    SocketChannel socketChannel = (SocketChannel)key.channel();

    readBuffer.clear();

    String message = null;

    try
    {
        final int bytesRead = socketChannel.read(readBuffer);

        if (-1 == bytesRead)
        {   // Empty/Closed Channel
            System.err.println("Error - No bytes to read on selected channel");
        }
        else
        {   // Convert ByteBuffer into a String
            System.out.println("Bytes Read: " + bytesRead);
            readBuffer.flip();
            message = byteBufferToString(readBuffer, bytesRead);
            readBuffer.clear();
        }
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block

        e.printStackTrace();
    }

    // Trim EOT off the end of the message
    return message.trim();
}

Фрагменты клиента:

    public void connect()
{
    boolean connectionStatus = false;
    String connectionHost = null;
    int connectionPort = 0;
    String connectionAuthKey = null;

    try
    {   // Login
        authenticationSocket = new Socket(AUTH_HOST, AUTH_PORT);
        out = authenticationSocket.getOutputStream();
        in = new BufferedInputStream(authenticationSocket.getInputStream());

        out.write("hello".getBytes());
        out.write(EOT);
        out.flush();


        StringBuilder helloResponse = new StringBuilder();

        // Read response off socket
        int currentByte = in.read();

        while (currentByte > -1 && currentByte != EOT)
        {
            helloResponse.append((char)currentByte);
            currentByte = in.read();
        }

        outgoingResponses.offer(Plist.fromXml(helloResponse.toString()));
        System.out.println("\n" + helloResponse.toString());

        out.write(credentials.getBytes());
        out.write(EOT);
        out.flush();

        // Read status
        int byteRead;

        StringBuilder command = new StringBuilder();

        do 
        {
            byteRead = in.read();
            if (0 < byteRead) 
            {
                if (EOT == byteRead)
                {
                    Logger.logData(command.toString());

                    Map<String, Object> plist = Plist.fromXml(command.toString());
                    outgoingResponses.offer(plist);

                    // Connection info for Data Port
                    connectionStatus = (Boolean)plist.get(STATUS_KEY);
                    connectionHost = (String)plist.get(SERVER_KEY);
                    connectionPort = (Integer)plist.get(PORT_KEY);
                    connectionAuthKey = (String)plist.get(AUTH_KEY);

                    Logger.logData("Server =>" + plist.get("server"));

                    command = new StringBuilder();

                }
                else
                {
                    command.append((char)byteRead);
                }
            }
        } 
        while (EOT != byteRead);
    }
    catch (UnknownHostException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (IOException e)
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    catch (XmlParseException e)
    {
        Logger.logData("Invalid Plist format");
        e.printStackTrace();
    }
    finally
    {   // Clean up handles
        try
        {
            authenticationSocket.close();
            out.close();
            in.close();
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    System.out.println("Connection status =>" + connectionStatus);
    System.out.println("Connection host =>" + connectionHost);
    System.out.println("Connection port =>" + connectionPort);

    if (connectionStatus)
    {
        dataServerHost = connectionHost;
        dataServerPort = connectionPort;
        dataServerAuthKey = connectionAuthKey;
        System.out.println("Connecting to data server @: " + dataServerHost + ":" + dataServerPort);
        connectToDataServer();
    }
}

1 Ответ

2 голосов
/ 08 июня 2011

Я помню, что пробуждение по ложному селектору возможно.

Хотя забавно, что нечего читать, когда вам просто говорят, что есть что-то прочитать, но обычно это не проблема для программ.Программа обычно должна ожидать произвольное количество байтов при чтении потока TCP;и случай 0 байтов обычно не требует специальной обработки.

Ваша программа теоретически неверна.Вы не можете ожидать, что сможете прочитать все сообщение сразу.Одно чтение может вернуть только часть.Может быть только 1 байт.Там нет никакой гарантии.

"Праведный" способ заключается в накоплении всех байтов, считанных в буфере.Ищите EOT в буфере.Если сообщение фрагментировано, может потребоваться несколько операций чтения, прежде чем будет получено сообщение целиком.

loop 
  select();
  if readable
     bytes = read()
     buffer.append(bytes)
     while( buffer has EOT at position i)
       msg = buffer[0-i]
       left shift buffer by i

Вы можете видеть в этом потоке, что не имеет значения, если read () читает 0 байтов.И это действительно не о НИО.Даже в традиционной блокировке TCP IO эта стратегия должна быть теоретически правильной.

Но практически, если вы заметите, что все сообщение всегда состоит из одного куска, вам не нужно такое усложнение;Ваш исходный код практически корректен в вашей среде.

Теперь вы заметили, что иногда читается 0 байт.Тогда ваше предыдущее практическое предположение должно быть пересмотрено.Вы можете добавить специальную ветку, чтобы игнорировать 0-байтовый фрагмент.

...