При настройке приложения-производителя и потребителя с помощью zeromq и protobuff, но со случайным повреждением пакетов на одной и той же машине на стороне потребителя, я столкнулся с тегом конечной группы протокола-сообщения, и ввод неожиданно завершился в середине проблем на местах, эти проблемы я для случайных сообщений, а не для каждого сообщения, частота отказов очень низкая, из 47,00,00,000 до 50,00,00,000 сообщений. 10-20 сообщений теряются.
Первоначально я использовал thread.sleep в своем коде производителя для остановки кода производителя на 10-100 мс и создания потоков для каждого события. Я заменил эту вещь на запланированный executorService и настроил количество потоков в службе исполнителя, а также поток подписчика на стороне потребителя. заменяется на singleThreadPool.
Ниже приведен мой протофайл продюсера, он скомпилирован в версии Java 8 с
proto syntax 3:
syntax = "proto3";
package eventpoc;
import "google/protobuf/timestamp.proto";
option java_package = "ABC";
option java_outer_classname = "XXDSEventsProtos";
message Event1 {
bool mcsInPosition = 1;
double azPos = 2;
double azPosDemand = 3;
double azPosError = 4;
bool azInPosition = 5;
double elPos = 6;
double elPosDemand = 7;
double elPosError = 8;
bool elInPosition = 9;
google.protobuf.Timestamp encodeLatchingTime = 10;
int32 azPosDmdErrCount = 11;
int32 elPosDmdErrCount = 12;
double azWrapPos = 13;
double azWrapPosDemand = 14;
double azWrapPosError = 15;
google.protobuf.Timestamp time = 16;
}
message Event2{
enum Health {
Good = 0;
Ill = 1;
Bad = 2;
interlocked = 3;
unknown =4;
}
Health healthMCS = 1;
string reason = 2;
google.protobuf.Timestamp time = 3;
}
Ниже приведен мой код производителя: производитель использует службу запланированного исполнителя для публикации события 1 с частотой 100 Гц и публикации события 2 с частотой 10 Гц.
val evntRunner1 = new Runnable {
override def run(): Unit = {
if (Event1Publisher.get()) {
val instant = Instant.now()
val timeStamp = Timestamp.newBuilder.setSeconds(instant.getEpochSecond).setNanos(instant.getNano).build()
val Event1 : Event1 = XXDSEventsProtos.Event1.newBuilder()
.setAzPos(azC)
.setElPos(elC)
.setAzPosError(azC)
.setElPosError(elC)
.setAzInPosition(true)
.setElInPosition(true)
.setTime(timeStamp)
//All dummy paramters below
.setMcsInPosition(true)
.setAzPosDemand(azC)
.setElPosDemand(elC)
.setEncodeLatchingTime(timeStamp)
.setAzPosDmdErrCount(1)
.setElPosDmdErrCount(1)
.setAzWrapPos(azC)
.setAzWrapPosDemand(azC)
.setAzWrapPosError(azC)
.build()
try{
if(pubSocket.sendMore("Event1")){
val currPos : Array[Byte] = DSEvent1.toByteArray
if(pubSocket.send(currPos,ZMQ.NOBLOCK)){
println(s"Successfully published Event1 event data ${DSEvent1.getA1},${DSEvent1.getE1},${DSEvent1.getTime} and byteArray: $currPos")
}else{
println(s"!!!!!!!! Error occured while publishing current position : $DSEvent1")
}
}else{
println(s"!!!!!!!! Error occured while publishing current position: $DSEvent1")
}
}catch{
case e : Exception =>
e.printStackTrace()
println("------------------- Exception occured while publishing current position event.---------------------------")
}
}
}
}
val eventRunner2 : Runnable = new Runnable {
override def run(): Unit = {
if (Event2Publisher.get()) {
val instant = Instant.now()
val timeStamp = Timestamp.newBuilder.setSeconds(instant.getEpochSecond).setNanos(instant.getNano).build()
val event2 = XXDSEventsProtos.Event2.newBuilder()
.setHealthMCS(XXDSEventsProtos.Event2.Health.Good)
.setReason("All is well")
.setTime(timeStamp)
.build()
if(pubSocket.sendMore("Health")){
val healthBytes = mcsHealth.toByteArray
if(pubSocket.send(healthBytes,ZMQ.NOBLOCK)){
println(s"Sent health event: $event2 with bytes : $healthBytes")
}else{
println(s"!!!!!!!! Error occured while publishing health information : $mcsHealth")
}
}
}
}
}
Ниже приведен мой потребительский прото-файл:
syntax = "proto3";
package eventpoc;
import "google/protobuf/timestamp.proto";
option java_package = "ABC";
option java_outer_classname = "XXDSEventsProtos";
message Event2{
enum Health {
Good = 0;
Ill = 1;
Bad = 2;
interlocked = 3;
unknown =4;
}
Health healthMCS = 1;
string reason = 2;
google.protobuf.Timestamp time = 3;
}
message Event1 {
// conjunction of azInPosition and elInPosition
bool mcsInPosition = 1;
double azPos = 2;
double azPosDemand = 3;
double azPosError = 4;
bool azInPosition = 5;
double elPos = 6;
double elPosDemand = 7;
double elPosError = 8;
bool elInPosition = 9;
google.protobuf.Timestamp encodeLatchingTime = 10;
int32 azPosDmdErrCount = 11;
int32 elPosDmdErrCount = 12;
double azWrapPos = 13;
double azWrapPosDemand = 14;
double azWrapPosError = 15;
google.protobuf.Timestamp time = 16;
}
Ниже мой потребительский код:
Потребительская сторона singleThreadPool используется для потребления сообщений, публикуемых производителем с разной скоростью. Здесь один поток будет принимать сообщения, публикуемые издателем, потребитель будет слушать сокет подписчика zeromq, преобразующий каждый полученный массив байтов в протокласс с помощью protobuff, отправлять его другим классам, но при декодировании с помощью команд protobuff возникают проблемы из-за неправильного тега конечной группы или неправильной длины сообщения.
val eventSubscriber = new Runnable {
override def run(): Unit = {
while (simEventSubscriber.get()) {
try {
val eventName: String = subscribeSocket.recvStr()
if (subscribeSocket.hasReceiveMore) {
val eventData = subscribeSocket.recv(ZMQ.DONTWAIT)
val hcdReceivalTime = Instant.now
val currentState = messageTransformer.decodeEvent(eventName, eventData)
} else {
log.error(s"No event data is received for event: $eventName")
}
} catch {
case e: Exception =>
e.printStackTrace()
log.error("exception in subscribing events from simulator: ", Map.empty, e, noId)
}
}
}
}
//MessageTransformer code:
override def decodeEvent(eventName: String, encodedEventData: Array[Byte]): Event = {
eventName match {
case Event1 =>
var event: Event1 = null
log.error(s"Decoding event: $eventName for data: $encodedEventData")
try {
event = XXDSEventsProtos.Event1.parseFrom(encodedEventData)
} catch {
case e: Exception =>
e.printStackTrace()
null
}
case Event2 =>
var event: Event2 = null
try {
log.error(s"Decoding event: $eventName for data: $encodedEventData")
event = XXDSEventsProtos.Event2.parseFrom(encodedEventData)
} catch {
case e: Exception =>
log.error("Exception while getting health event", Map.empty, e, noId)
e.printStackTrace()
null
}
}
}
zeromq должен правильно передавать сообщения на той же машине, но во время тестирования производительности я заметил, что некоторые сообщения повреждены во время передачи, я неожиданно заканчиваю ввод, и тег конечной группы сообщений протокола не соответствует ожидаемым проблемам с тегами.
Это случайная проблема, я получаю это для 10-15 сообщений для среднего прогона 12 часов и для 47,00 000 000 - 50 000 000 сообщений, я не получаю то, что нужно сделать для этой проблемы:
com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.
com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:86)
com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:110) ABC.protos.EventsProtos$Event2.parseFrom(EventsProtos.java:2166)