Исключения при чтении сообщений protobuf в Java - PullRequest
3 голосов
/ 02 июля 2011

Я использую protobuf уже несколько недель, но я все еще получаю исключения при разборе сообщений protobuf в Java .

Я использую C ++ для создания своих протобуф-сообщений и отправляю их с буст-сокетами в сокет сервера, где клиент Java слушает. Код C ++ для передачи сообщения:

boost::asio::streambuf b;
std::ostream os(&b);

ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);

delete coded_output;
delete raw_output;

boost::system::error_code ignored_error;

boost::asio::async_write(socket, b.data(), boost::bind(
        &MessageService::handle_write, this,
        boost::asio::placeholders::error));

Как вы можете видеть, я пишу WriteVarint32 длину сообщения, поэтому Java-сторона должна знать, используя parseDelimitedFrom, как далеко она должна читать:

AgentMessage agentMessage = AgentMessageProtos.AgentMessage    
                                .parseDelimitedFrom(socket.getInputStream());

Но это не поможет, я продолжаю получать такие исключения:

Protocol message contained an invalid tag (zero).
Message missing required fields: ...
Protocol message tag had invalid wire type.
Protocol message end-group tag did not match expected tag.
While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either than the input has been truncated or that an embedded message misreported its own length.

важно знать, что эти исключения не появляются в каждом сообщении. Это лишь малая часть сообщений, которые я получаю больше всего, просто отлично, но все же я бы хотел это исправить, так как я не хочу опускать сообщения.

Буду очень признателен, если кто-нибудь сможет мне помочь или потратит свои идеи.


Другим интересным фактом является количество сообщений, которые я получаю. Общее количество сообщений 1.000 за 2 секунды обычно для моей программы. Через 20 секунд около 100.000 и так далее. Я сократил количество сообщений, отправляемых искусственно, и когда передается только 6-8 сообщений, ошибок нет вообще. Так может ли это быть проблемой буферизации на стороне сокета Java-клиента?

Например, 60 000 сообщений, в среднем 5 из них повреждены.

Ответы [ 2 ]

1 голос
/ 14 апреля 2014

[Я не совсем эксперт по TCP, это может быть далеко]

Проблема в том, что [100] * сокета [Java] TCP Socket вернется после чтения до конца кадра TCP. Если это окажется в середине сообщения (я имею в виду , то есть сообщение protobuf ), парсер захлебнется и выбросит InvalidProtocolBufferException.

Любой вызов синтаксического анализа protobuf использует CodedInputStream внутренне ( src здесь ), который в случае, если источником является InputStream, опирается на read() - и, следовательно, подчиняется Проблема с сокетом TCP.

Итак, когда вы заполняете большие объемы данных через сокет, некоторые сообщения должны быть разделены на два кадра - и вот где они будут повреждены.

Я предполагаю, что когда вы понижаете скорость передачи сообщений (как вы сказали до 6-8 сообщений в секунду), каждый кадр отправляется до того, как следующий фрагмент данных помещается в поток, поэтому каждое сообщение всегда получает свою собственную Фрейм TCP, т. Е. Никто не разбивается и не получает ошибок. (Или, может быть, просто ошибки редки, а низкая частота означает, что вам нужно больше времени, чтобы их увидеть)

Что касается решения, то лучше всего самостоятельно обработать буфер, то есть прочитать byte[] из сокета (вероятно, используя readFully() вместо read(), потому что первый будет блокировать до тех пор, пока не будет достаточно данных для заполнить буфер [или EOF встречается], так что он вроде устойчив к завершению фрейма середины сообщения), убедиться, что у него достаточно данных для анализа в целое сообщение, а затем передать буфер в анализатор.

Кроме того, есть хорошая статья на эту тему в этой теме групп Google - вот где я получил часть readFully().

1 голос
/ 03 июля 2011

Я не знаком с Java API, но мне интересно, как Java работает со значением uint32, обозначающим длину сообщения, потому что Java имеет только 32-разрядные целые числа со знаком. Быстрый просмотр справки по API Java показал, что 32-разрядное значение без знака хранится в 32-разрядной переменной со знаком. Так как же обрабатывается случай, когда 32-разрядное значение без знака обозначает длину сообщения? Кроме того, кажется, что есть поддержка целых чисел со знаком varint в реализации Java. Они называются ZigZag32 / 64. AFAIK, версия C ++ не знает о таких кодировках. Так может быть причина вашей проблемы может быть связана с этими вещами?

...