Разрушитель отбросит все сообщения, кроме одного, в Spark - PullRequest
0 голосов
/ 28 апреля 2019

Я узнал о разрушителе и использовал его в своем приложении Spark.Я обнаружил, что прерыватель сбрасывает все сообщения, кроме одного, при выполнении на пряже, но хорошо работает при выполнении на локальной Spark.

Я проверил MessageEventTranslator и MessageEventHandler нарушителя и обнаружил, что когда приложение выполняется на пряже, MessageEventTranslator будетполучить только одно событие, но оно может получить все сообщения при выполнении на локальном Spark.

input.foreachPartition { part => {

      val disruptor = new Disruptor[util.Map[String, Object]](new MessageEventFactory, ringBufferSize, new MessageThreadFactory, ProducerType.MULTI, new BlockingWaitStrategy)

      disruptor.handleEventsWith(new MessageEventHandler(batchSize, this))
      disruptor.setDefaultExceptionHandler(new MessageExceptionHandler)
      val ringBuffer = disruptor.start
      val producer = new MessageEventProducer(ringBuffer)

      part.foreach { row =>
        accm.add(1)
        producer.onData(row)

      }
    }
    }


class MessageEventTranslator extends EventTranslatorOneArg[util.Map[String, Object], Row] {
  private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventTranslator]);

  override def translateTo(event: util.Map[String, Object], sequence: Long, arg0: Row): Unit = {

    try {

      val fields = arg0.schema.fieldNames
      for (field <- fields) {
        val value =  arg0.getAs[Object](field)
        event.put(field, value)
      }
    } catch {
      case e:NullPointerException => {
        LOGGER.error("MessageEventTranslator.translateTo{}",e.toString())
      }
    }

  }
}



class MessageEventHandler(val batchSize: Integer, val output:OutputFlusher) extends EventHandler[util.Map[String, Object]] {

  private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventHandler]);

  val queue:util.Queue[util.Map[String,Object]] = new LinkedBlockingQueue[util.Map[String, Object]] ()

  override def onEvent(event: util.Map[String, Object], sequence: Long, endOfBatch: Boolean): Unit = {
    queue.add(event)
    if(queue.size() > batchSize || endOfBatch)
    {
        output.flush(queue)
    }
  }
}

Я неправильно его использую?Как я могу это исправить?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...