Java - ReadObject с помощью nio - PullRequest
5 голосов
/ 03 мая 2011

На традиционном сервере с блокирующим потоком я бы сделал что-то вроде этого

class ServerSideThread {

    ObjectInputStream in;
    ObjectOutputStream out;
    Engine engine;

    public ServerSideThread(Socket socket, Engine engine) {
        in = new ObjectInputStream(socket.getInputStream());
        out = new ObjectOutputStream(socket.getOutputStream());
        this.engine = engine;
    }

    public void sendMessage(Message m) {
        out.writeObject(m);
    }

    public void run() {
        while(true) {
            Message m = (Message)in.readObject();
            engine.queueMessage(m,this); // give the engine a message with this as a callback
        }
    }
}

Теперь можно ожидать, что объект будет довольно большим.В моем цикле nio я не могу просто ждать, пока объект пройдет, все мои другие соединения (с гораздо меньшими рабочими нагрузками) будут ждать меня.

Как я могу только получить уведомление, что соединение имеетвесь объект, прежде чем он сообщит моему каналу nio, что он готов?

Ответы [ 2 ]

7 голосов
/ 03 мая 2011

Вы можете записать объект в ByteArrayOutputStream, позволяя указать длину перед отправкой объекта. На принимающей стороне прочитайте объем необходимых данных, прежде чем пытаться их декодировать.

Однако вам, вероятно, будет гораздо проще и эффективнее использовать блокирующий ввод-вывод (а не NIO) с объектом * Stream


Редактировать что-то вроде этого

public static void send(SocketChannel socket,  Serializable serializable) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    for(int i=0;i<4;i++) baos.write(0);
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(serializable);
    oos.close();
    final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray());
    wrap.putInt(0, baos.size()-4);
    socket.write(wrap);
}

private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]);
private ByteBuffer dataByteBuffer = null;
private boolean readLength = true;

public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException {
    if (readLength) {
        socket.read(lengthByteBuffer);
        if (lengthByteBuffer.remaining() == 0) {
            readLength = false;
            dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0));
            lengthByteBuffer.clear();
        }
    } else {
        socket.read(dataByteBuffer);
        if (dataByteBuffer.remaining() == 0) {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array()));
            final Serializable ret = (Serializable) ois.readObject();
            // clean up
            dataByteBuffer = null;
            readLength = true;
            return ret;
        }
    }
    return null;
}
2 голосов
/ 09 апреля 2013

Вдохновленный приведенным выше кодом, я создал ( проект GoogleCode )

Он включает в себя простой модульный тест:

SeriServer server = new SeriServer(6001, nthreads);
final SeriClient client[] = new SeriClient[nclients];

//write the data with multiple threads to flood the server

for (int cnt = 0; cnt < nclients; cnt++) {
    final int counterVal = cnt;
    client[cnt] = new SeriClient("localhost", 6001);
    Thread t = new Thread(new Runnable() {
         public void run() {
             try {
                for (int cnt2 = 0; cnt2 < nsends; cnt2++) {
                   String msg = "[" + counterVal + "]";                       
                   client[counterVal].send(msg);
                 }
             } catch (IOException e) {
                 e.printStackTrace();
                 fail();
             }
         }
         });
    t.start();
 }

 HashMap<String, Integer> counts = new HashMap<String, Integer>();
   int nullCounts = 0;
   for (int cnt = 0; cnt < nsends * nclients;) {
       //read the data from a vector (that the server pool automatically fills
       SeriDataPackage data = server.read();  
       if (data == null) {
              nullCounts++;
              System.out.println("NULL");
              continue;
       }

       if (counts.containsKey(data.getObject())) {
              Integer c = counts.get(data.getObject());
              counts.put((String) data.getObject(), c + 1);
        } else {
              counts.put((String) data.getObject(), 1);
        }
        cnt++;
        System.out.println("Received: " + data.getObject());
   }

   // asserts the results
   Collection<Integer> values = counts.values();
   for (Integer value : values) {
        int ivalue = value;
        assertEquals(nsends, ivalue);
        System.out.println(value);
   }
   assertEquals(counts.size(), nclients);
   System.out.println(counts.size());
   System.out.println("Finishing");
   server.shutdown();
...