Буферы протокола Google: parseDelimitedFrom и writeDelimitedTo для C ++ - PullRequest
7 голосов
/ 25 ноября 2011

Извините, что спросил это снова, но не могли бы мы, пожалуйста, раз и навсегда опубликовать здесь некоторые функции C ++, которые соответствуют функциям Java?Похоже, они не добавляются в Google, и написать их самостоятельно - довольно сложно.Ниже ответьте, используя некоторые из это и это .

1 Ответ

4 голосов
/ 25 ноября 2011

Вот две основные версии с boost asio.Обратите внимание, что для правильной работы, технически, во второй версии нужно было бы посмотреть, сколько данных находится в буфере, выяснить, насколько большим был заголовок (VarInt не является фиксированным размером), но CodedInputStream имеет GetDirectBufferPointer,с указателем, где он находится, поэтому из этого указателя можно определить оставшийся размер сообщения, сравнить его с заданным размером сообщения, создать новый скорректированный буфер для оставшегося размера и выполнить асинхронное чтение для остальной частисообщение.Ниже работает, пока сообщения остаются маленькими (я думаю, около 1 КБ или около того).Если у кого-то есть пропущенный бит, пожалуйста, говорите.Спасибо.

writeDelimitedTo в C ++:

boost::asio::streambuf request;
{
    std::ostream request_stream(&request);

    google::protobuf::io::OstreamOutputStream raw_output (&request_stream);
    google::protobuf::io::CodedOutputStream coded_output(&raw_output);

    coded_output.WriteVarint32(myProtoMsg.ByteSize());
    myProtoMsg.SerializeToCodedStream(&coded_output);
}
boost::asio::write(socket,request);

parseDelimitedFrom:

char buf[5000];
void Session::Read()
{
    boost::asio::async_read( 
        socket,
        boost::asio::buffer(buf),
        boost::asio::transfer_at_least(1),
        boost::bind(&Session::Handle_Read,shared_from_this(),boost::asio::placeholders::error));
}
void Session::Handle_Read(const boost::system::error_code& error)
{
    if (!error)
    {
        google::protobuf::io::ArrayInputStream arrayInputStream(buf,5000);
        google::protobuf::io::CodedInputStream codedInputStream(&arrayInputStream);
        uint32_t messageSize;
        codedInputStream.ReadVarint32(&messageSize);
        //Read more here
        MyProtoMsg myProtoMsg;
        myProtoMsg.ParseFromCodedStream(&codedInputStream);
    }
    Read();
}

РЕДАКТИРОВАТЬ: вышенемного ленивый (с «подробнее здесь»).Ниже приведен полный синтаксический анализ.Любые комментарии приветствуются.

NEW parseDelimitedFrom:

static void ReadMyVarint32(int& headerSize,int& messageSize,char buffer[])
{
    // Fast path:  We have enough bytes left in the buffer to guarantee that
    // this read won't cross the end, so we can skip the checks.
    char const* ptr = buffer;
    char b;
    uint32_t result;

    b = *(ptr++); result  = (b & 0x7F)      ; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) <<  7; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |=  b         << 28; if (!(b & 0x80)) goto done;

    // If the input is larger than 32 bits, we still need to read it all
    // and discard the high-order bits.
    for (int i = 0; i < 5; i++) {
    b = *(ptr++); if (!(b & 0x80)) goto done;
    }

    // We have overrun the maximum size of a varint (10 bytes).  Assume
    // the data is corrupt.
    headerSize = 0;
    messageSize = 0;

done:
    headerSize = ptr - buffer;
    messageSize = (int)result;
}

char buf[5000];
int receivedSize(0);
int missingSize(0);

void Session::Read()
{
    boost::asio::async_read( 
        socket,
        boost::asio::buffer(buf),
        boost::asio::transfer_at_least(1),
        boost::bind(&Session::Handle_Read,shared_from_this(),_1,_2));
}

void Session::Handle_Read(const boost::system::error_code& error,std::size_t bytes_transferred)
{
    if (!error)
    {
        int mybytes_transferred((int)bytes_transferred);
        if(missingSize == 0)
        {
            int headerSize, messageSize;
            ReadMyVarint32(headerSize,messageSize,buf);
            //std::cout << "Read new message: HeaderSize " << headerSize << " MessageSize " << messageSize << " Received: " << mybytes_transferred << std::endl;

            for(int i(0);i<mybytes_transferred-headerSize;++i)
                request[i] = buf[headerSize+i];

            missingSize = headerSize + messageSize - mybytes_transferred;
            receivedSize = mybytes_transferred - headerSize;
        }
        else
        {
            //std::cout << "Continue message: Read so far " << receivedSize << " Missing " << missingSize << " Received: " << mybytes_transferred << std::endl;
            for(int i(0);i<mybytes_transferred;++i)
                request[receivedSize+i] = buf[i];
            missingSize -= mybytes_transferred;
            receivedSize += mybytes_transferred;
        }
        if(missingSize < 0)
        {
            //Received too much, give up
            missingSize = 0;
            receivedSize = 0;
        }
        else if(missingSize == 0)
        {
            // Use your proto class here
            RequestWrapperPtr requestWrapperPtr(new RequestWrapper());
            if(requestWrapperPtr->ParseFromArray(request,receivedSize))
            {
                HandleRW(requestWrapperPtr);
            }
            else
            {
                // std::cout  << BaseString() << "Session Handle_Read: Failed to parse!";
            }
        }
        Read();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...