Объект, проходящий через неблокирующие сокеты Java NIO, не всегда успешен - PullRequest
1 голос
/ 20 июля 2011

Я пишу программу для симуляции пиров в сети P2P с использованием неблокирующих сокетов Java NIO.Идея состоит в том, чтобы каждый пир использовал один и тот же код для отправки и получения сообщений, а также сервер, позволяющий пирам загружаться в сеть.

Проблема, с которой я столкнулся, заключается в том, что до четырех пиров могут успешноподключайтесь к сети и общайтесь друг с другом (Ping, Pong, Query и QueryHit), когда я добавляю пятого пира, сервер всегда сообщает об исключении «StreamCorruptedException».Я проверил этот сайт, а также веб-сайты с кодами и учебными пособиями по Java NIO, но безрезультатно.Я понимаю, что отправка объектов через неблокирующие сокеты не легка / идеальна (особенно с ObjectOutputStream и ObjectInputStream), но я хочу минимизировать использование потоков (также я не хочу переписывать это с нуля!).

Сначала я покажу самые важные методы (отправка и получение сообщения), но я могу добавить больше, если потребуется.

Метод записи:

public void write(SelectionKey selKey){ 
    SocketChannel channel = (SocketChannel)selKey.channel();
    ArrayList<Message> queue = pending.get(channel);

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos;
    try{
        oos = new ObjectOutputStream(baos);
    }catch(Exception e){
        System.err.println("Could not create object output stream. Aborting...");
        return;
    }

    while(!queue.isEmpty()){            
        Message message = queue.get(0);
        buffer.clear();
        try{
            oos.writeObject(message);
            buffer = ByteBuffer.wrap(baos.toByteArray());
            channel.write(buffer);
            oos.flush();
            baos.flush();
        }catch(Exception e){
            System.err.println("Could not parse object. Aborting...");
            queue.remove(0);
            return;
        }
        queue.remove(0);
    }
    selKey.interestOps(SelectionKey.OP_READ);
}

И чтениеМетод:

public Message read(SelectionKey selKey) throws IOException, ClassNotFoundException{
    SocketChannel channel = (SocketChannel)selKey.channel();        
    Message message = null;

    buffer = ByteBuffer.allocate(8192);

    int bytesRead = channel.read(buffer);
    if(bytesRead > 0){
        buffer.flip();
        InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit());
        ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException.
        message = (Message)ois.readObject();
        ois.close();
    }

    return message;
}

Любая помощь будет принята с благодарностью!

Ответы [ 2 ]

3 голосов
/ 20 июля 2011
int bytesRead = channel.read(buffer);
if(bytesRead > 0){
    buffer.flip();
    InputStream bais = new ByteArrayInputStream(buffer.array(), 0, buffer.limit());
    ObjectInputStream ois = new ObjectInputStream(bais); //Offending line. Produces the StreamCorruptedException.
    message = (Message)ois.readObject();
    ois.close();
}

Здесь вы смешиваете блокирующий и неблокирующий ввод / вывод. Буфер не будет исчерпывающе читать из сокета, он будет читать только то, что доступно. Если вы собираетесь использовать этот подход, сначала вам нужно прочитать все данные в буфер.

2 голосов
/ 20 июля 2011

Вы не можете предполагать, что ваше чтение вернет весь объект в одном куске. Вам нужен протокол «out-of-band» (ну, вне диапазона ObjectStream), чтобы убедиться, что вы получаете полные сообщения, при необходимости собирая сообщения из фрагментов чтения. Для неблокирования это будет намного сложнее, чем сейчас.

...