Java Selector застрял на select () после принятия подключения - PullRequest
1 голос
/ 25 февраля 2012

В настоящее время я работаю над домашним заданием, где мне нужно создать сервер, который принимает соединения с помощью Java Selector и общается по каналам сокетов. Клиенты подключаются к Серверу, используя свой собственный канал сокета, сервер принимает соединение, устанавливает новый канал сокета на OP_READ и ожидает чтения информации. Ниже приведен метод run для сервера и методы accept и read для сервера.

//TODO: Figure out why the data is being read over and over again
    public void run()
    {

            while(true)
            {
                try{


                // Process potential change requests
                processChangeRequests();    
                //select all the things! returns the amount of channels that are ready 

                //PPP: Select is stuck after accepted socket is changed to OP_READ -- data is not read
                this.selector.select();

                //DEBUG
                System.out.println("Selector Selected Something!");

                //get the set of keys of the channels that are ready
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

                while(keyIterator.hasNext()) {

                    //get the key itself
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();

                    System.out.println("key: "+key.interestOps());

                    if(!key.isValid())
                    {
                        continue;
                    }

                    //conditions
                    if(key.isAcceptable()) {
                        // a connection was accepted by a ServerSocketChannel.
                        this.accept(key);
                    } 
                    else if (key.isReadable()) {
                        // a channel is ready for reading
                        this.readData(key);
                    } 
                    else if (key.isWritable()) {
                        // a channel is ready for writing

                        //this.fPool.submitTask(new MessageDigestProcessor(key,this.buffer.array()));
                    }


                }
                //fPool.submitTask(new MessageDigestProcessor(socket));
            }catch (Exception e) { e.printStackTrace(); }
        }
    }

    private void accept(SelectionKey key) throws IOException
    {
        System.out.println("Accepted Connection!");
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
        SocketChannel clientSocketChannel = serverSocketChannel.accept();
        clientSocketChannel.configureBlocking(false);
        clientSocketChannel.register(this.selector, SelectionKey.OP_READ);
    }

/**
     * Reads a message digest from the input stream from the socket
     * @return the message digest read from the input stream or "" if there was a problem reading
     * TODO read until there is no more data to be read?
     * TODO do not close the socket channels unless there is an exception from reading
     */
    private void readData(SelectionKey key) throws IOException
    {
        SocketChannel clientSocketChannel = (SocketChannel) key.channel();
        //DEBUG
        System.out.println("Message received from: "+clientSocketChannel.socket().getInetAddress());
        //clear the buffer before reading
        this.buffer.clear();
        int numRead;
        try
        {
            numRead = clientSocketChannel.read(this.buffer);    
            //DEBUG
            System.out.println("num read: "+numRead);

        } catch (IOException e)
        {
            key.cancel();
            clientSocketChannel.close();
            return;
        }

        if(numRead==-1)
        {
            key.cancel();
            key.channel().close();
            return;
        }

        //DEBUG
        try {
            System.out.println(Utility.SHA1FromBytes(this.buffer.array()));
        } catch (NoSuchAlgorithmException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        //time for writing!
        key.interestOps(SelectionKey.OP_WRITE);

    }

Прием работает нормально после того, как я подключаюсь к серверу с клиентом, но после того, как клиент записывает данные, сервер блокируется на this.selector.select (), поэтому он никогда не вызывает readData (). Я что-то упускаю? Я следовал за кодом через отладчик в eclipse, и на этом он остановился.

РЕДАКТИРОВАТЬ: Вот код клиента

package cs455.scaling;

import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Client implements Runnable {

    //stores all the hash codes for all messages that are sent to the server
    private final LinkedList<String> hashCodes = new LinkedList<String>();
    private final long messageRate;
    private final ByteBuffer buffer = ByteBuffer.allocate(8192);
    private final Selector selector;
    private final InetSocketAddress serverSocketAddress;//maintain an open connection at all times, bro. Why should I shut it?
    private final List<ChangeRequest> changeRequests = new LinkedList<ChangeRequest>();
    //private final Thread senderThread;


    public Client(String ipAddress, int port, long messageRate) throws UnknownHostException, IOException
    {
        this.serverSocketAddress = new InetSocketAddress(InetAddress.getByName(ipAddress),port);
        this.messageRate = messageRate;
        this.selector = Selector.open();
//      senderThread = new Thread(new MessageDigestSender(this));
//      senderThread.start();
    }

    public long getMessageRate()
    {
        return messageRate;
    }

    /**
     * Generates a message digest and sends it over the connected socket
     * TODO open new connection each time a send needs to occur?
     * @throws IOException
     */
    public void sendMessageDigest() throws IOException
    {
        initConnection();
        //generate the message
        byte [] data = generateMessageDigest();
        //prepare the data
        buffer.clear();
        buffer.put(data);

        this.selector.wakeup();
    }

    /**
     * Does the actual writing of the message
     * @param SelectionKey key that represents the channel that is being written to
     */
    private void write(SelectionKey key) throws IOException
    {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        socketChannel.write(buffer);
        System.out.println("Wrote data to Server...");
        key.interestOps(SelectionKey.OP_READ);//not interested in writing for right now -- wait for message from the server
    }

    public void run()
    {
        while(true)
        {
            try{

                //process the socket channel op changes
                processChangeRequests();

                this.selector.select();

                //get the set of keys of the channels that are ready
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

                while(keyIterator.hasNext()) {

                    //get the key itself
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();

                    //System.out.println(key.interestOps());

                    //avoid invalid keys
                    if(!key.isValid())
                    {
                        continue;
                    }   
                    if (key.isConnectable()) {
                        this.finishConnection(key);
                    } else if (key.isReadable()) {
                        this.readData(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) { e.printStackTrace(); }
        } 
    }

    /**
     * Method that queues up changes that need to be made to selection keys before they are processed
     * this is useful for when multiple threads need to make changes to selection keys
     * XXX: Used when the caller is NOT the selecting thread
     */
    private void processChangeRequests() throws IOException
    {
        synchronized(this.changeRequests)
        {
            Iterator<ChangeRequest> changes = this.changeRequests.iterator();
            while(changes.hasNext())
            {
                ChangeRequest changeRequest = changes.next();
                switch(changeRequest.type)
                {
                    case(ChangeRequest.CHANGE_OP):
                        changeRequest.channel.keyFor(this.selector).interestOps(changeRequest.ops);
                        break;
                    case ChangeRequest.REGISTER:
                        changeRequest.channel.register(this.selector, changeRequest.ops);
                        break;
                    }
            }
            this.changeRequests.clear();
        }
    }


    /**
     * Initialize the socket channel on the specified ip address and port
     * configure it for non-blocking
     * @param ipAddress
     * @param port
     * @throws IOException
     */
    private void initConnection() throws IOException
    {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(this.serverSocketAddress);

        //a request is made because the selecting thread is not the caller of this function
        synchronized(this.changeRequests){
            this.changeRequests.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER,SelectionKey.OP_CONNECT));
        }
    }

    /**
     * Finish the connection by calling finishConnect() on the channel and then setting it to OP_WRITE
     * @param key
     */
    private void finishConnection(SelectionKey key)
    {
        // a connection was established with a remote server.
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Finish the connection. If the connection operation failed
        // this will raise an IOException.
        try {
            socketChannel.finishConnect();
            System.out.println("Finished connecting to server");
        } catch (IOException e) {
          // Cancel the channel's registration with our selector
            e.printStackTrace();
            key.cancel();
          return;
        }

        // Register an interest in writing on this channel
        key.interestOps(SelectionKey.OP_WRITE);
    }

    /**
     * Serves as a wrapper around the SHA1FromBytes method
     * It generates a byte array using the util.Random class and then generates a message digest from those bytes
     * If the algorithm doesn't exist then a blank string is returned
     */
    private byte [] generateMessageDigest()
    {
        Random random = new Random();
        byte [] data = new byte[8192];
        random.nextBytes(data);
        String digest = "";
        try {
            digest = Utility.SHA1FromBytes(data);
            //add it to the hashCodes linkedList
            hashCodes.add(digest);
            System.out.println("Generated Digest: "+digest);
        } catch (NoSuchAlgorithmException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return data;

    }

    /**
     * Reads a message digest from the input stream from the socket
     * @return the message digest read from the input stream or "" if there was a problem reading
     * TODO read until there is no more data to be read?
     * TODO do not close the socket channels unless there is an exception from reading
     */
    private void readData(SelectionKey key) throws IOException
    {
        SocketChannel clientSocketChannel = (SocketChannel) key.channel();
        //DEBUG
        System.out.println("Message received from: "+clientSocketChannel.socket().getInetAddress());
        //clear the buffer before reading
        this.buffer.clear();
        int numRead;
        try
        {
            numRead = clientSocketChannel.read(this.buffer);    
            //DEBUG
            System.out.println("num read: "+numRead);

        } catch (IOException e)
        {
            //problem reading from a socket channel
            e.printStackTrace();
            key.cancel();
            clientSocketChannel.close();
            return;
        }

        this.buffer.flip();
        System.out.println("Received Message Digest: "+new String(this.buffer.array()));

        //done! open business later on!
        key.channel().close();
        key.channel().keyFor(this.selector).cancel();


    }


    public static void main(String [] args)
    {
        String serverHost = "";
        int serverPort = 0;
        long messageRate = 0;

        if(args.length<3)
        {
            System.err.println("Format: java cs455.scaling.Client [server-host] [server-port] [message-rate]");
            System.exit(0);
        }

        //parse the arguments out
        try{

            serverHost = args[0];
            serverPort = Integer.parseInt(args[1]);
            messageRate = Long.parseLong(args[2]);

        } catch(NumberFormatException ne) {
            ne.printStackTrace();
        } catch (ArrayIndexOutOfBoundsException ae)
        {
            ae.printStackTrace();
        }

        //start the client
        try
        {
                Client client = new Client(serverHost,serverPort,messageRate);
                Thread t = new Thread(client);
                t.start();
                client.sendMessageDigest();

        } catch(IOException i)
        {
            i.printStackTrace();
        }

    }
}

1 Ответ

2 голосов
/ 27 февраля 2012

Вы игнорируете возвращаемые значения read () и write (), поэтому вы не можете обнаружить короткую запись или запись нулевой длины или условия EOS при чтении.

...