Существуют ли C ++ эквиваленты для функций ввода-вывода с разделителями из буфера протокола в Java? - PullRequest
61 голосов
/ 26 февраля 2010

Я пытаюсь читать / записывать несколько сообщений Protocol Buffers из файлов, как на C ++, так и на Java. Google предлагает писать префиксы длины перед сообщениями, но по умолчанию это сделать невозможно (что я видел).

Однако API Java в версии 2.1.0 получил набор функций ввода-вывода с разделителями, которые, очевидно, выполняют эту работу:

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

Существуют ли эквиваленты в C ++? А если нет, то какой проводной формат используется для префиксов размера, который присоединяет API Java, чтобы я мог проанализировать эти сообщения в C ++?


Обновление:

Теперь они существуют в google/protobuf/util/delimited_message_util.h, начиная с версии 3.3.0.

Ответы [ 10 ]

72 голосов
/ 08 апреля 2014

Я немного опоздал на вечеринку здесь, но нижеприведенные реализации включают в себя некоторые оптимизации, отсутствующие в других ответах, и не потерпят неудачу после 64 МБ ввода (хотя все равно применяется ограничение 64 МБ для каждого отдельное сообщение, только не на весь поток).

(я являюсь автором библиотек protobuf на C ++ и Java, но я больше не работаю в Google. Извините, что этот код никогда не попал в официальную библиотеку. Вот так он выглядел бы, если бы имел).

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}
17 голосов
/ 26 февраля 2010

Хорошо, поэтому я не смог найти функции C ++ верхнего уровня, реализующие то, что мне нужно, но некоторые подробные сведения по ссылке на API Java обнаружили следующее в интерфейсе MessageLite :

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

Таким образом, префикс размера Java - это вариант (Protocol Buffers)!

Вооружившись этой информацией, я начал копаться в API C ++ и нашел заголовок CodedStream , который имеет следующие данные:

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

Используя их, я смогу прокрутить свои собственные функции C ++, которые выполняют эту работу.

Они действительно должны добавить это к основному API сообщений; у него отсутствует функциональность, учитывая то, что в Java он есть, как и у великолепного порта C # protobuf-net Marc Gravell (через SerializeWithLengthPrefix и DeserializeWithLengthPrefix).

12 голосов
/ 26 февраля 2010

Я решил ту же проблему, используя CodedOutputStream / ArrayOutputStream для записи сообщения (с размером) и CodedInputStream / ArrayInputStream для чтения сообщения (с размером).

Например, следующий псевдокод записывает размер сообщения, следующий за сообщением:

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

При написании вы должны также убедиться, что ваш буфер достаточно велик, чтобы вместить сообщение (включая размер). И при чтении вы должны проверить, что ваш буфер содержит целое сообщение (включая размер).

Это определенно было бы удобно, если бы они добавили удобные методы в C ++ API, подобные тем, которые предоставляются Java API.

7 голосов
/ 31 декабря 2015

Я столкнулся с одной и той же проблемой в C ++ и Python.

Для версии C ++ я использовал смесь кода, который Кентон Варда разместил в этой теме, и код из запроса на извлечение, который он отправил команде protobuf (поскольку размещенная здесь версия не обрабатывает EOF, а ту, которую он отправил GitHub делает).

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

А вот моя реализация python2:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

Это может быть не самый привлекательный код, и я уверен, что он может быть подвергнут рефакторингу, но по крайней мере это должно показать вам один из способов сделать это.

Теперь большая проблема: это МЕДЛЕННО .

Даже при использовании реализации C ++ для python-protobuf он на один порядок медленнее, чем в чистом C ++. У меня есть тест, где я читаю из файла 10M протобуф-сообщений размером ~ 30 байт каждое. Это занимает ~ 0,9 с в C ++ и 35 с в Python.

Один из способов сделать это немного быстрее - повторно внедрить декодер varint, чтобы он считывал из файла и декодировал за один раз вместо чтения из файла, а затем декодировал, как этот код делает в настоящее время. (профилирование показывает, что значительное количество времени затрачивается на кодер / декодер varint). Но само собой разумеется, что одного недостаточно, чтобы сократить разрыв между версией Python и версией C ++.

Любая идея сделать это быстрее приветствуется :)

7 голосов
/ 15 ноября 2012

IsteamInputStream очень хрупок по отношению к eofs и другим ошибкам, которые легко возникают при использовании вместе с std :: istream. После этого потоки protobuf постоянно повреждаются, а все уже использованные данные буфера уничтожаются. В protobuf есть правильная поддержка чтения из традиционных потоков.

Реализуйте google::protobuf::io::CopyingInputStream и используйте его вместе с CopyingInputStreamAdapter . Сделайте то же самое для вариантов вывода.

На практике вызов синтаксического анализа заканчивается в google::protobuf::io::CopyingInputStream::Read(void* buffer, int size), где дается буфер. Единственное, что осталось сделать, это как-то прочитать.

Вот пример для использования с синхронизированными потоками Asio ( SyncReadStream / SyncWriteStream ):

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

Использование:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);
7 голосов
/ 02 октября 2012

Вот, пожалуйста:

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};
3 голосов
/ 30 июня 2011

Также искал решение для этого. Вот ядро ​​нашего решения, предполагая, что некоторый код Java записал много сообщений MyRecord с writeDelimitedTo в файл. Откройте файл и выполните цикл:

if(someCodedInputStream->ReadVarint32(&bytes)) {
  CodedInputStream::Limit msgLimit = someCodedInputStream->PushLimit(bytes);
  if(myRecord->ParseFromCodedStream(someCodedInputStream)) {
    //do your stuff with the parsed MyRecord instance
  } else {
    //handle parse error
  }
  someCodedInputStream->PopLimit(msgLimit);
} else {
  //maybe end of file
}

Надеюсь, это поможет.

0 голосов
/ 05 апреля 2016

Поскольку мне не разрешено писать это как комментарий к ответу Кентона Варды выше; Я считаю, что в коде, который он опубликовал, есть ошибка (а также в других ответах, которые были предоставлены). Следующий код:

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

устанавливает неправильный лимит, потому что он не учитывает размер varint32, который уже был прочитан из ввода. Это может привести к потере / повреждению данных, так как из потока считываются дополнительные байты, которые могут быть частью следующего сообщения. Обычный способ справиться с этим правильно - удалить CodedInputStream, используемый для чтения размера, и создать новый для чтения полезной нагрузки:

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...
0 голосов
/ 14 декабря 2013

Работая с объективной версией буферов протокола, я столкнулся с этой проблемой. При отправке с клиента iOS на сервер на базе Java, который использует parseDelimitedFrom, который ожидает длину в качестве первого байта, мне нужно было сначала вызвать writeRawByte для CodedOutputStream. Публикация здесь, чтобы помочь другим людям, которые сталкиваются с этой проблемой. Работая над этой проблемой, можно подумать, что у протобуфов Google будет просто флаг, который сделает это за вас ...

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}
0 голосов
/ 26 февраля 2010

Вы можете использовать getline для чтения строки из потока, используя указанный разделитель:

istream& getline ( istream& is, string& str, char delim );

(определено в заголовке)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...