AvroPlanCompleteTrigger является классом pojo, сгенерированным схемой avro java. Код работает, когда мы работаем на локальном компьютере.
Avro версия: 1.9.1, spark core 2.4.0, spark streaming 2_11 = 2.4.0
Может кто-нибудь помочь, пожалуйста?
Exception in thread "streaming-job-executor-0" java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.()V from class com.twraalm.replenishment.odr.avro.dto.AvroPlanCompleteTrigger
at com.twraalm.replenishment.odr.avro.dto.AvroPlanCompleteTrigger.(AvroPlanCompleteTrigger.java:22)
at com.twraalm.replenishment.edf.process.BuzzerOwTrigger$.pushToKafkaPri(BuzzerOwTrigger.scala:97)
at com.twraalm.replenishment.edf.process.BuzzerOwTrigger$.publishSrcItemPriAgg(BuzzerOwTrigger.scala:117)
at com.twraalm.replenishment.edf.process.BuzzerService$.updateSrcPriAggregation(BuzzerService.scala:111)
at com.twraalm.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:48)
at com.twraalm.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream$1.apply(BuzzerStreamProcessor.scala:28)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Ниже приведен код на языке scala. AvroPlanCompleteTrigger.SCHEMA $ SCHEMA $, по сути, является членом c класса AvroPlanCompleteTrigger java, класс которого сгенерирован схемой avro.
def pushToKafkaPri[T: Encoder](dataset: Dataset[T], sparkSess: SparkSession) {
import sparkSess.implicits._
var props: Properties = new Properties()
props.put("bootstrap.servers", priBootStrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
val topic=publishPriTopic;
val dummyObj: AvroPlanCompleteTrigger = AvroPlanCompleteTrigger.newBuilder().build()
dataset.foreachPartition((iterator)=>{
val edfPcProcessorPrioritizationInput = new java.util.ArrayList[T]()
while (iterator.hasNext) {
edfPcProcessorPrioritizationInput.add(iterator.next())
}
KafkaAvroMessageSenderToPcProcessor.send(props, topic,edfPcProcessorPrioritizationInput)
})
}
public class KafkaAvroMessageSenderToPcProcessor implements Serializable {
public static <T> void send(Properties props, String topic, Collection<T> collection) {
Producer<String, Object> producer = new KafkaProducer(props);
try {
for (T row : collection) {
ProducerRecord<String, Object> record = null;
try {
record = new ProducerRecord<>(topic,null,getSerializedBytes(row));
producer.send(record);
} catch (Exception e) {
String msg = "Error occured while serializing message before inserting into topic: ";
LOGGER.error("Error occured while serializing message before inserting into {} topic.", topic,e);
throw new SerializationException(msg + topic + "'", e);
}
}
} finally {
producer.close();
}
}
private static <T> byte [] getSerializedBytes (T row) throws IOException {
GenericRecord genericRecord = new GenericData.Record(AvroPlanCompleteTrigger.SCHEMA$);
EdfPcProcessorPrioritization edfPcProcessorPrioritization = (EdfPcProcessorPrioritization) row;
genericRecord.put("groupId", edfPcProcessorPrioritization.groupId());
genericRecord.put("runUUID", edfPcProcessorPrioritization.runUUID());
genericRecord.put("wmtItemNumber", String.valueOf(edfPcProcessorPrioritization.wmtItemNumber()));
genericRecord.put("sourceLocation", edfPcProcessorPrioritization.sourceLocation());
genericRecord.put("retryCount", edfPcProcessorPrioritization.retryCount());
genericRecord.put("applicationName", edfPcProcessorPrioritization.applicationName());
SpecificDatumWriter writer = new SpecificDatumWriter<GenericData>(AvroPlanCompleteTrigger.SCHEMA$);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(genericRecord, encoder);
encoder.flush();
out.close();
byte [] serializedBytes = out.toByteArray();
return serializedBytes;
}
}