Вот две основные версии с boost asio.Обратите внимание, что для правильной работы, технически, во второй версии нужно было бы посмотреть, сколько данных находится в буфере, выяснить, насколько большим был заголовок (VarInt не является фиксированным размером), но CodedInputStream имеет GetDirectBufferPointer,с указателем, где он находится, поэтому из этого указателя можно определить оставшийся размер сообщения, сравнить его с заданным размером сообщения, создать новый скорректированный буфер для оставшегося размера и выполнить асинхронное чтение для остальной частисообщение.Ниже работает, пока сообщения остаются маленькими (я думаю, около 1 КБ или около того).Если у кого-то есть пропущенный бит, пожалуйста, говорите.Спасибо.
writeDelimitedTo в C ++:
boost::asio::streambuf request;
{
std::ostream request_stream(&request);
google::protobuf::io::OstreamOutputStream raw_output (&request_stream);
google::protobuf::io::CodedOutputStream coded_output(&raw_output);
coded_output.WriteVarint32(myProtoMsg.ByteSize());
myProtoMsg.SerializeToCodedStream(&coded_output);
}
boost::asio::write(socket,request);
parseDelimitedFrom:
char buf[5000];
void Session::Read()
{
boost::asio::async_read(
socket,
boost::asio::buffer(buf),
boost::asio::transfer_at_least(1),
boost::bind(&Session::Handle_Read,shared_from_this(),boost::asio::placeholders::error));
}
void Session::Handle_Read(const boost::system::error_code& error)
{
if (!error)
{
google::protobuf::io::ArrayInputStream arrayInputStream(buf,5000);
google::protobuf::io::CodedInputStream codedInputStream(&arrayInputStream);
uint32_t messageSize;
codedInputStream.ReadVarint32(&messageSize);
//Read more here
MyProtoMsg myProtoMsg;
myProtoMsg.ParseFromCodedStream(&codedInputStream);
}
Read();
}
РЕДАКТИРОВАТЬ: вышенемного ленивый (с «подробнее здесь»).Ниже приведен полный синтаксический анализ.Любые комментарии приветствуются.
NEW parseDelimitedFrom:
static void ReadMyVarint32(int& headerSize,int& messageSize,char buffer[])
{
// Fast path: We have enough bytes left in the buffer to guarantee that
// this read won't cross the end, so we can skip the checks.
char const* ptr = buffer;
char b;
uint32_t result;
b = *(ptr++); result = (b & 0x7F) ; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 7; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
b = *(ptr++); result |= b << 28; if (!(b & 0x80)) goto done;
// If the input is larger than 32 bits, we still need to read it all
// and discard the high-order bits.
for (int i = 0; i < 5; i++) {
b = *(ptr++); if (!(b & 0x80)) goto done;
}
// We have overrun the maximum size of a varint (10 bytes). Assume
// the data is corrupt.
headerSize = 0;
messageSize = 0;
done:
headerSize = ptr - buffer;
messageSize = (int)result;
}
char buf[5000];
int receivedSize(0);
int missingSize(0);
void Session::Read()
{
boost::asio::async_read(
socket,
boost::asio::buffer(buf),
boost::asio::transfer_at_least(1),
boost::bind(&Session::Handle_Read,shared_from_this(),_1,_2));
}
void Session::Handle_Read(const boost::system::error_code& error,std::size_t bytes_transferred)
{
if (!error)
{
int mybytes_transferred((int)bytes_transferred);
if(missingSize == 0)
{
int headerSize, messageSize;
ReadMyVarint32(headerSize,messageSize,buf);
//std::cout << "Read new message: HeaderSize " << headerSize << " MessageSize " << messageSize << " Received: " << mybytes_transferred << std::endl;
for(int i(0);i<mybytes_transferred-headerSize;++i)
request[i] = buf[headerSize+i];
missingSize = headerSize + messageSize - mybytes_transferred;
receivedSize = mybytes_transferred - headerSize;
}
else
{
//std::cout << "Continue message: Read so far " << receivedSize << " Missing " << missingSize << " Received: " << mybytes_transferred << std::endl;
for(int i(0);i<mybytes_transferred;++i)
request[receivedSize+i] = buf[i];
missingSize -= mybytes_transferred;
receivedSize += mybytes_transferred;
}
if(missingSize < 0)
{
//Received too much, give up
missingSize = 0;
receivedSize = 0;
}
else if(missingSize == 0)
{
// Use your proto class here
RequestWrapperPtr requestWrapperPtr(new RequestWrapper());
if(requestWrapperPtr->ParseFromArray(request,receivedSize))
{
HandleRW(requestWrapperPtr);
}
else
{
// std::cout << BaseString() << "Session Handle_Read: Failed to parse!";
}
}
Read();
}
}