Я узнал о разрушителе и использовал его в своем приложении Spark.Я обнаружил, что прерыватель сбрасывает все сообщения, кроме одного, при выполнении на пряже, но хорошо работает при выполнении на локальной Spark.
Я проверил MessageEventTranslator и MessageEventHandler нарушителя и обнаружил, что когда приложение выполняется на пряже, MessageEventTranslator будетполучить только одно событие, но оно может получить все сообщения при выполнении на локальном Spark.
input.foreachPartition { part => {
val disruptor = new Disruptor[util.Map[String, Object]](new MessageEventFactory, ringBufferSize, new MessageThreadFactory, ProducerType.MULTI, new BlockingWaitStrategy)
disruptor.handleEventsWith(new MessageEventHandler(batchSize, this))
disruptor.setDefaultExceptionHandler(new MessageExceptionHandler)
val ringBuffer = disruptor.start
val producer = new MessageEventProducer(ringBuffer)
part.foreach { row =>
accm.add(1)
producer.onData(row)
}
}
}
class MessageEventTranslator extends EventTranslatorOneArg[util.Map[String, Object], Row] {
private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventTranslator]);
override def translateTo(event: util.Map[String, Object], sequence: Long, arg0: Row): Unit = {
try {
val fields = arg0.schema.fieldNames
for (field <- fields) {
val value = arg0.getAs[Object](field)
event.put(field, value)
}
} catch {
case e:NullPointerException => {
LOGGER.error("MessageEventTranslator.translateTo{}",e.toString())
}
}
}
}
class MessageEventHandler(val batchSize: Integer, val output:OutputFlusher) extends EventHandler[util.Map[String, Object]] {
private val LOGGER = LoggerFactory.getLogger(classOf[MessageEventHandler]);
val queue:util.Queue[util.Map[String,Object]] = new LinkedBlockingQueue[util.Map[String, Object]] ()
override def onEvent(event: util.Map[String, Object], sequence: Long, endOfBatch: Boolean): Unit = {
queue.add(event)
if(queue.size() > batchSize || endOfBatch)
{
output.flush(queue)
}
}
}
Я неправильно его использую?Как я могу это исправить?