Java NIO SocketChannels: проблема сериализации с несколькими объектами и высокая загрузка системы - PullRequest
2 голосов
/ 23 октября 2011

Я использую SocketChannels из java.nio для отправки объектов между несколькими узлами в сети p2p.Каждый узел имеет ServerSocketChannel, к которому могут подключаться другие узлы.Я отправляю сериализованные объекты через эти SocketChannels.Основой моего кода в основном является учебник по NIO http://rox -xmlrpc.sourceforge.net / niotut /

Все отправляемые мной сообщения реализуют один и тот же интерфейс, поэтому я могу выполнить десериализациюна стороне получателя.Следующий код делает это (обновлено с помощью счетчика байтов, см. Ниже):

private void readKey(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer readBuffer = ByteBuffer.allocate(4);

    channel.read(readBuffer);
    readBuffer.rewind();
    int numBytes = readBuffer.getInt();

    readBuffer = ByteBuffer.allocate(numBytes);
    int read = channel.read(readBuffer);

    Message msg = Message.deserialize(readBuffer);  
    this.overlay.addIncomingMessage(msg);
}

Отправка выполняется посредством сериализации объекта, сообщение сериализуется и добавляется в очередь, проценты по интересам дляканалы меняются, и сериализованный объект отправляется.

private void writeKey(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);

        // Write until there's not more data ...
        while (!queue.isEmpty()) {
            ByteBuffer buf = (ByteBuffer) queue.get(0);

                            // UPDATE for message length
            ByteBuffer len = ByteBuffer.allocate(4);
            len.putInt(buf.remaining());
            len.rewind();

            socketChannel.write(len);
            socketChannel.write(buf);
            if (buf.remaining() > 0) {
                // ... or the socket's buffer fills up
                break;
            }
            queue.remove(0);
        }

        if (queue.isEmpty()) {
            key.interestOps(SelectionKey.OP_READ);
        }
    }       
}

Это все работает нормально, пока загрузка системы мала.Когда я начинаю отправлять много сообщений, некоторые сообщения, которые пишутся на канал, не принимаются.Когда я добавляю задержки в свой код для замедления, все работает как положено.

Я думаю, что может быть проблема, когда несколько сообщений отправляются до того, как получатель считывает буферы каналов для создания объектов, но я неНе знаю, как решить эту проблему.

Я ценю любые подсказки или идеи.

С уважением, Кристоф

ОБНОВЛЕНИЕ: После первой подсказки я добавил количество переданных байтовотправляющей стороне и считывайте только то количество байтов на приемнике, но без эффекта.

1 Ответ

6 голосов
/ 23 октября 2011

вы, похоже, предполагаете, что получатель будет получать «пакеты» сообщения в тех же порциях, в которых отправитель их отправляет.это не будет иметь место.базовый сокет может произвольно разделять / объединять отправляемые вами фрагменты.

для реализации подобного протокола на основе сообщений вам необходимо управлять порциями сообщений.при чтении вы должны обрабатывать сокет как поток данных и не предполагать, что буфер данных, полученный от вызова чтения, соответствует одному сообщению.Один из способов реализации протокола на основе сообщений в потоке состоит в том, чтобы сначала записать длину сообщения, а затем записать байты сообщения.на принимающей стороне вы сначала читаете длину сообщения, затем только расходует столько байтов при разборе сообщения.

ОБНОВЛЕНИЕ:

если вы все еще теряете сообщения,Я думаю, что проблема в вашей логике синхронизациипо включенному коду трудно сказать, но как синхронизируются отдельные очереди (я вижу только блокировку верхнего уровня в списке pendingData).также, какую синхронизацию вы используете на принимающей стороне?

...