Попытка пакетировать TCP с неблокирующим вводом-выводом трудна Я делаю что-то неправильно? - PullRequest
3 голосов
/ 12 апреля 2010

О, как бы я хотел, чтобы TCP был основан на пакетах, как UDP! [см. комментарии] Но, увы, дело не в этом, поэтому я пытаюсь реализовать свой собственный пакетный уровень. Вот цепочка событий (игнорируя запись пакетов)

О, и мои Пакеты очень просто структурированы: два беззнаковых байта для длины, а затем байтовые [длина] данных. (Я не могу себе представить, если бы они были более сложными, я был бы до ушей в if заявлениях!)

  • Server находится в бесконечном цикле, принимая соединения и добавляя их в список Connection с.
  • PacketGatherer (другой поток) использует Selector, чтобы выяснить, какие Connection.SocketChannel s готовы к чтению.
  • Он перебирает результаты и сообщает каждому Connection read().
  • Каждый Connection имеет частичный IncomingPacket и список Packet с, которые были полностью прочитаны и ожидают обработки.
  • Вкл. read():
    • Скажите частичное IncomingPacket, чтобы прочитать больше данных. (IncomingPacket.readData ниже)
    • Если чтение завершено (IncomingPacket.complete()), введите из него Packet и вставьте Packet в список, ожидающий обработки, а затем замените его новым IncomingPacket.

Есть пара проблем с этим. Во-первых, одновременно читается только один пакет. Если IncomingPacket требуется только один байт, то этот проход считывается только один байт. Это, конечно, можно исправить с помощью петли, но это становится немного сложнее, и мне интересно, есть ли лучший общий способ.

Во-вторых, логика в IncomingPacket немного сумасшедшая, чтобы иметь возможность прочитать два байта для длины и затем прочитать фактические данные. Вот код для быстрого и удобного чтения:

int readBytes;         // number of total bytes read so far
byte length1, length2; // each byte in an unsigned short int (see getLength())

public int getLength() { // will be inaccurate if readBytes < 2
    return (int)(length1 << 8 | length2);
}

public void readData(SocketChannel c) {
    if (readBytes < 2) { // we don't yet know the length of the actual data
        ByteBuffer lengthBuffer = ByteBuffer.allocate(2 - readBytes);
        numBytesRead = c.read(lengthBuffer);

        if(readBytes == 0) {
            if(numBytesRead >= 1)
                length1 = lengthBuffer.get();

            if(numBytesRead == 2)
                length2 = lengthBuffer.get();
        } else if(readBytes == 1) {
            if(numBytesRead == 1)
                length2 = lengthBuffer.get();
        }
        readBytes += numBytesRead;
    }

    if(readBytes >= 2) { // then we know we have the entire length variable
        // lazily-instantiate data buffers based on getLength()
        // read into data buffers, increment readBytes

        // (does not read more than the amount of this packet, so it does not
        // need to handle overflow into the next packet's data)
    }
}

public boolean complete() {
    return (readBytes > 2 && readBytes == getLength()+2);
}

В основном мне нужны отзывы о моем коде и общем процессе. Пожалуйста, предложите какие-либо улучшения. Даже капитальный ремонт всей моей системы будет в порядке, если у вас есть предложения о том, как лучше реализовать все это. Книжные рекомендации тоже приветствуются; Я люблю книги. Мне просто кажется, что что-то не так.


Вот общее решение, которое я нашел благодаря ответу Джулиано: (не стесняйтесь комментировать, если у вас есть какие-либо вопросы)

public void fillWriteBuffer() {
    while(!writePackets.isEmpty() && writeBuf.remaining() >= writePackets.peek().size()) {
        Packet p = writePackets.poll();
        assert p != null;
        p.writeTo(writeBuf);
    }
}

public void fillReadPackets() {
    do {
        if(readBuf.position() < 1+2) {
            // haven't yet received the length
            break;
        }

        short packetLength = readBuf.getShort(1);

        if(readBuf.limit() >= 1+2 + packetLength) {
            // we have a complete packet!

            readBuf.flip();

            byte packetType = readBuf.get();

            packetLength = readBuf.getShort();

            byte[] packetData = new byte[packetLength];
            readBuf.get(packetData);

            Packet p = new Packet(packetType, packetData);
            readPackets.add(p);
            readBuf.compact();
        } else {
            // not a complete packet
            break;
        }

    } while(true);
}

Ответы [ 4 ]

7 голосов
/ 12 апреля 2010

Возможно, это не тот ответ, который вы ищете, но кто-то должен был бы сказать это: вы, вероятно, переоцениваете решение для очень простой проблемы.

У вас нет пакетов до их полного прибытия, даже IncomingPacket с. У вас есть только поток байтов без определенного значения. Обычное, простое решение состоит в том, чтобы хранить входящие данные в буфере (это может быть простой массив byte [], но рекомендуется использовать правильный эластичный и циклический буфер, если производительность является проблемой). После каждого чтения вы проверяете содержимое буфера, чтобы увидеть, сможете ли вы извлечь весь пакет оттуда. Если вы можете, вы создаете Packet, отбрасываете правильное количество байтов от начала буфера и повторяете. Если или когда вы не можете извлечь весь пакет, вы сохраняете эти входящие байты там до следующего успешного чтения чего-либо из сокета.

Пока вы это делаете, если вы осуществляете обмен данными на основе дейтаграмм по потоковому каналу, я бы порекомендовал вам указывать магическое число в начале каждого «пакета», чтобы вы могли проверить оба конца соединения все еще синхронизированы. Они могут выйти из синхронизации, если по какой-либо причине (ошибка) один из них читает или записывает неверное число байтов в / из потока.

1 голос
/ 12 апреля 2010

Игнорирование отключений клиента и отключение сервера на данный момент, вот более или менее традиционная структура сокет-сервера:

  • Селектор , ручки розеток:
    • опросы открытых сокетов
    • если это сокет сервера, создайте новый Connection object
    • для каждого активного клиентского сокета найдите Соединение , вызовите его с событием (чтение или запись)
  • Соединение (по одному на розетку), обрабатывает ввод / вывод на одной розетке:
    • Связь с Протоколом через две очереди, вход и выход
    • содержит два буфера, один для чтения, один для записи и соответствующие смещения
    • при событии чтения: прочитать все доступные входные байты, найти границы сообщений, поместить целые сообщения в протокол очередь ввода, вызвать протокол
    • при событии записи: записать буфер или, если он пуст, взять очередь вывода формы сообщения в буфер, начать запись
  • Протокол (по одному на соединение), обрабатывает обмен прикладным протоколом при одном подключении:
    • принять сообщение из входной очереди, разобрать приложение часть сообщения
    • сервер работает (здесь конечный автомат - некоторые сообщения соответствуют одному состоянию, а другие - нет), генерирует ответное сообщение, помещает его в очередь вывода

Вот и все. Все может быть в одной теме. Ключевым моментом здесь является разделение обязанностей. Надеюсь, это поможет.

1 голос
/ 12 апреля 2010

Разве вы не можете просто прочитать любое количество байтов, которые готовы к чтению, и подать все входящие байты в конечный автомат синтаксического анализа пакетов? Это будет означать обработку входящего (TCP) потока данных, как и любого другого входящего потока данных (через последовательную линию, или USB, по каналу или как угодно ...)

Таким образом, у вас есть некоторый селектор, определяющий, из какого соединения (соединений) есть входящие байты для чтения и сколько. Затем вы (для каждого соединения) прочитали бы доступные байты, а затем передали эти байты в (конечный тип соединения) экземпляр конечного автомата (хотя чтение и подача могли бы выполняться из одного и того же класса). Этот класс конечного автомата для анализа пакетов будет время от времени выплевывать готовые пакеты и передавать их тому, кто будет обрабатывать эти полные и проанализированные пакеты.

Для примера формата пакета, например

    2 magic header bytes to mark the start
    2 bytes of payload size (n)
    n bytes of payload data
    2 bytes of checksum

у конечного автомата были бы состояния вроде (попробуйте enum, у Java сейчас такие есть, я так понимаю)

wait_for_magic_byte_0,
wait_for_magic_byte_1,
wait_for_length_byte_0,
wait_for_length_byte_1,
wait_for_payload_byte (with a payload_offset variable counting),
wait_for_chksum_byte_0,
wait_for_chksum_byte_1

и на каждом входящем байте вы можете соответственно менять состояние. Если входящий байт не продвигает конечный автомат должным образом, сбросьте байт, сбросив конечный автомат на wait_for_magic_byte_0.

0 голосов
/ 12 апреля 2010

Я думаю, что вы подходите к вопросу с немного неправильной стороны. Вместо того, чтобы думать о пакетах, подумайте о структуре данных. Это то, что вы отправляете. По сути, да, это пакет прикладного уровня, но просто думайте о нем как об объекте данных. Затем на самом низком уровне напишите подпрограмму, которая будет считывать данные с провода и выводить объекты данных. Это даст вам слой абстракции, который, я думаю, вы ищете.

...