java .lang.IllegalAccessError: попытался получить доступ к методу org. apache .avro.specifi c .SpecificData. () V - PullRequest
1 голос
/ 16 января 2020

AvroPlanCompleteTrigger является классом pojo, сгенерированным схемой avro java. Код работает, когда мы работаем на локальном компьютере.

Avro версия: 1.9.1, spark core 2.4.0, spark streaming 2_11 = 2.4.0

Может кто-нибудь помочь, пожалуйста?

Exception in thread "streaming-job-executor-0" java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.()V from class com.twraalm.replenishment.odr.avro.dto.AvroPlanCompleteTrigger
    at com.twraalm.replenishment.odr.avro.dto.AvroPlanCompleteTrigger.(AvroPlanCompleteTrigger.java:22)
    at com.twraalm.replenishment.edf.process.BuzzerOwTrigger$.pushToKafkaPri(BuzzerOwTrigger.scala:97)
    at com.twraalm.replenishment.edf.process.BuzzerOwTrigger$.publishSrcItemPriAgg(BuzzerOwTrigger.scala:117)
    at com.twraalm.replenishment.edf.process.BuzzerService$.updateSrcPriAggregation(BuzzerService.scala:111)
    at com.twraalm.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:48)
    at com.twraalm.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:28)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Ниже приведен код на языке scala. AvroPlanCompleteTrigger.SCHEMA $ SCHEMA $, по сути, является членом c класса AvroPlanCompleteTrigger java, класс которого сгенерирован схемой avro.

  def pushToKafkaPri[T: Encoder](dataset: Dataset[T], sparkSess: SparkSession) {
    import sparkSess.implicits._
    var props: Properties = new Properties()
      props.put("bootstrap.servers", priBootStrapServers);
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
      val topic=publishPriTopic;
      val dummyObj: AvroPlanCompleteTrigger = AvroPlanCompleteTrigger.newBuilder().build()
      dataset.foreachPartition((iterator)=>{
            val edfPcProcessorPrioritizationInput = new java.util.ArrayList[T]()
                while (iterator.hasNext) {
                    edfPcProcessorPrioritizationInput.add(iterator.next())
                }
            KafkaAvroMessageSenderToPcProcessor.send(props, topic,edfPcProcessorPrioritizationInput)
      })

  }

public class KafkaAvroMessageSenderToPcProcessor implements Serializable {
public static <T> void send(Properties props, String topic, Collection<T> collection) {

        Producer<String, Object> producer = new KafkaProducer(props);
        try {
            for (T row : collection) {
                ProducerRecord<String, Object> record = null; 
                try {
                    record = new ProducerRecord<>(topic,null,getSerializedBytes(row));
                    producer.send(record);
                } catch (Exception e) {
                    String msg = "Error occured while serializing message before inserting into topic: ";
                    LOGGER.error("Error occured while serializing message before inserting into {} topic.", topic,e);
                    throw new SerializationException(msg + topic + "'", e);
                }
            }
        } finally {
            producer.close();
        }

    }

private static <T> byte [] getSerializedBytes (T row) throws IOException {
        GenericRecord genericRecord = new GenericData.Record(AvroPlanCompleteTrigger.SCHEMA$);
        EdfPcProcessorPrioritization edfPcProcessorPrioritization = (EdfPcProcessorPrioritization) row;

        genericRecord.put("groupId", edfPcProcessorPrioritization.groupId());
        genericRecord.put("runUUID", edfPcProcessorPrioritization.runUUID());
        genericRecord.put("wmtItemNumber", String.valueOf(edfPcProcessorPrioritization.wmtItemNumber()));
        genericRecord.put("sourceLocation", edfPcProcessorPrioritization.sourceLocation());
        genericRecord.put("retryCount", edfPcProcessorPrioritization.retryCount());
        genericRecord.put("applicationName", edfPcProcessorPrioritization.applicationName());

        SpecificDatumWriter writer  = new SpecificDatumWriter<GenericData>(AvroPlanCompleteTrigger.SCHEMA$);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(genericRecord, encoder);
        encoder.flush();
        out.close();
        byte [] serializedBytes = out.toByteArray();
        return serializedBytes;
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...