Повреждение сообщений в Protobuff и ZeroMQ для длительных тестов - PullRequest
0 голосов
/ 03 января 2019

При настройке приложения-производителя и потребителя с помощью zeromq и protobuff, но со случайным повреждением пакетов на одной и той же машине на стороне потребителя, я столкнулся с тегом конечной группы протокола-сообщения, и ввод неожиданно завершился в середине проблем на местах, эти проблемы я для случайных сообщений, а не для каждого сообщения, частота отказов очень низкая, из 47,00,00,000 до 50,00,00,000 сообщений. 10-20 сообщений теряются.

Первоначально я использовал thread.sleep в своем коде производителя для остановки кода производителя на 10-100 мс и создания потоков для каждого события. Я заменил эту вещь на запланированный executorService и настроил количество потоков в службе исполнителя, а также поток подписчика на стороне потребителя. заменяется на singleThreadPool.

Ниже приведен мой протофайл продюсера, он скомпилирован в версии Java 8 с

proto syntax 3:
        syntax = "proto3";
        package eventpoc;
        import "google/protobuf/timestamp.proto";
        option java_package = "ABC";
        option java_outer_classname = "XXDSEventsProtos";


        message Event1 {
            bool mcsInPosition = 1;
           double azPos = 2;
           double azPosDemand = 3;
           double azPosError = 4;
           bool azInPosition = 5;
           double elPos = 6;
           double elPosDemand = 7;
           double elPosError = 8;
           bool elInPosition = 9;
           google.protobuf.Timestamp encodeLatchingTime = 10;
           int32 azPosDmdErrCount = 11;
           int32 elPosDmdErrCount = 12;
           double azWrapPos = 13;
           double azWrapPosDemand = 14;
           double azWrapPosError = 15;
           google.protobuf.Timestamp time = 16;
        }
        message Event2{
            enum Health {
                Good = 0;
                Ill = 1;
                Bad = 2;
                interlocked = 3;
                unknown =4;
            }
             Health healthMCS = 1;
             string reason = 2;
             google.protobuf.Timestamp time = 3; 
        }

Ниже приведен мой код производителя: производитель использует службу запланированного исполнителя для публикации события 1 с частотой 100 Гц и публикации события 2 с частотой 10 Гц.

    val evntRunner1 = new Runnable {
        override def run(): Unit = {
    if (Event1Publisher.get()) {

            val instant = Instant.now()
            val timeStamp = Timestamp.newBuilder.setSeconds(instant.getEpochSecond).setNanos(instant.getNano).build()
            val Event1 : Event1 =  XXDSEventsProtos.Event1.newBuilder()
              .setAzPos(azC)
              .setElPos(elC)
              .setAzPosError(azC)
              .setElPosError(elC)
              .setAzInPosition(true)
              .setElInPosition(true)
              .setTime(timeStamp)
              //All dummy paramters below
              .setMcsInPosition(true)
              .setAzPosDemand(azC)
              .setElPosDemand(elC)
              .setEncodeLatchingTime(timeStamp)
              .setAzPosDmdErrCount(1)
              .setElPosDmdErrCount(1)
              .setAzWrapPos(azC)
              .setAzWrapPosDemand(azC)
              .setAzWrapPosError(azC)
              .build()
            try{
              if(pubSocket.sendMore("Event1")){
                val currPos : Array[Byte] = DSEvent1.toByteArray
                if(pubSocket.send(currPos,ZMQ.NOBLOCK)){
                  println(s"Successfully published Event1 event data ${DSEvent1.getA1},${DSEvent1.getE1},${DSEvent1.getTime} and byteArray: $currPos")
                }else{
                  println(s"!!!!!!!! Error occured while publishing current position : $DSEvent1")
                }
              }else{
                println(s"!!!!!!!! Error occured while publishing current position: $DSEvent1")
              }
            }catch{
              case e : Exception =>
                e.printStackTrace()
                println("------------------- Exception occured while publishing current position event.---------------------------")
            }
           }
      }
    }

     val eventRunner2 : Runnable =  new Runnable {
        override def run(): Unit = {
          if (Event2Publisher.get()) {
            val instant = Instant.now()
            val timeStamp = Timestamp.newBuilder.setSeconds(instant.getEpochSecond).setNanos(instant.getNano).build()
            val event2 = XXDSEventsProtos.Event2.newBuilder()
              .setHealthMCS(XXDSEventsProtos.Event2.Health.Good)
              .setReason("All is well")
              .setTime(timeStamp)
              .build()

          if(pubSocket.sendMore("Health")){
            val healthBytes = mcsHealth.toByteArray
            if(pubSocket.send(healthBytes,ZMQ.NOBLOCK)){
              println(s"Sent health event: $event2 with bytes : $healthBytes")
            }else{
              println(s"!!!!!!!! Error occured while publishing health information : $mcsHealth")
            }
          }
        }
       }
      }

Ниже приведен мой потребительский прото-файл:

 syntax = "proto3";
    package eventpoc;
    import "google/protobuf/timestamp.proto";
    option java_package = "ABC";
    option java_outer_classname = "XXDSEventsProtos";
    message Event2{
        enum Health {
            Good = 0;
            Ill = 1;
            Bad = 2;
            interlocked = 3;
            unknown =4;
        }
         Health healthMCS = 1;
         string reason = 2;
         google.protobuf.Timestamp time = 3; 
    }
    message Event1 {
      // conjunction of azInPosition and elInPosition
       bool mcsInPosition = 1;
       double azPos = 2;
       double azPosDemand = 3;
       double azPosError = 4;
       bool azInPosition = 5;
       double elPos = 6;
       double elPosDemand = 7;
       double elPosError = 8;
       bool elInPosition = 9;
       google.protobuf.Timestamp encodeLatchingTime = 10;
       int32 azPosDmdErrCount = 11;
       int32 elPosDmdErrCount = 12;
       double azWrapPos = 13;
       double azWrapPosDemand = 14;
       double azWrapPosError = 15;
       google.protobuf.Timestamp time = 16;
    }

Ниже мой потребительский код: Потребительская сторона singleThreadPool используется для потребления сообщений, публикуемых производителем с разной скоростью. Здесь один поток будет принимать сообщения, публикуемые издателем, потребитель будет слушать сокет подписчика zeromq, преобразующий каждый полученный массив байтов в протокласс с помощью protobuff, отправлять его другим классам, но при декодировании с помощью команд protobuff возникают проблемы из-за неправильного тега конечной группы или неправильной длины сообщения.

            val eventSubscriber = new Runnable {
                override def run(): Unit = {
                  while (simEventSubscriber.get()) {
                    try {
                      val eventName: String = subscribeSocket.recvStr()
                      if (subscribeSocket.hasReceiveMore) {
                        val eventData       = subscribeSocket.recv(ZMQ.DONTWAIT)
                        val hcdReceivalTime = Instant.now
                        val currentState    = messageTransformer.decodeEvent(eventName, eventData)
                      } else {
                        log.error(s"No event data is received for event: $eventName")
                      }
                    } catch {
                      case e: Exception =>
                        e.printStackTrace()
                        log.error("exception in subscribing events from simulator: ", Map.empty, e, noId)
                    }
                  }
                }
              }

//MessageTransformer code:
            override def decodeEvent(eventName: String, encodedEventData: Array[Byte]): Event = {
                eventName match {
                  case Event1 =>
                    var event: Event1 = null
                    log.error(s"Decoding event: $eventName for data: $encodedEventData")
                    try {
                      event = XXDSEventsProtos.Event1.parseFrom(encodedEventData)

                    } catch {
                      case e: Exception =>
                        e.printStackTrace()
                        null
                    }

                  case Event2 =>
                    var event: Event2 = null
                    try {
                      log.error(s"Decoding event: $eventName for data: $encodedEventData")
                      event = XXDSEventsProtos.Event2.parseFrom(encodedEventData)
                     } catch {
                      case e: Exception =>
                        log.error("Exception while getting health event", Map.empty, e, noId)
                        e.printStackTrace()
                        null
                    }
                }

              }

zeromq должен правильно передавать сообщения на той же машине, но во время тестирования производительности я заметил, что некоторые сообщения повреждены во время передачи, я неожиданно заканчиваю ввод, и тег конечной группы сообщений протокола не соответствует ожидаемым проблемам с тегами. Это случайная проблема, я получаю это для 10-15 сообщений для среднего прогона 12 часов и для 47,00 000 000 - 50 000 000 сообщений, я не получаю то, что нужно сделать для этой проблемы:

com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
       com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:86)

com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
        com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:110) ABC.protos.EventsProtos$Event2.parseFrom(EventsProtos.java:2166)
...