Я пытаюсь настроить производителя kafka с KafkaAvroSerialzer для значения. И я сталкиваюсь с этой ошибкой, когда Рит пытается создать Продюсера. Я использую все банки, представленные в слиянии 5.2.1
java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.schemaregistry.client.rest.RestService
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:104)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:81)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:53)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:43)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:370)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at swing.KafkaUtilityPanel.sendMessage(KafkaUtilityPanel.java:229)
at swing.KafkaUtilityPanel.access$1200(KafkaUtilityPanel.java:32)
at swing.KafkaUtilityPanel$4.actionPerformed(KafkaUtilityPanel.java:164)
at javax.swing.AbstractButton.fireActionPerformed(AbstractButton.java:2022)
at javax.swing.AbstractButton$Handler.actionPerformed(AbstractButton.java:2348)
at javax.swing.DefaultButtonModel.fireActionPerformed(DefaultButtonModel.java:402)
at javax.swing.DefaultButtonModel.setPressed(DefaultButtonModel.java:259)
at javax.swing.plaf.basic.BasicButtonListener.mouseReleased(BasicButtonListener.java:252)
at java.awt.Component.processMouseEvent(Component.java:6533)
at javax.swing.JComponent.processMouseEvent(JComponent.java:3324)
at java.awt.Component.processEvent(Component.java:6298)
at java.awt.Container.processEvent(Container.java:2236)
at java.awt.Component.dispatchEventImpl(Component.java:4889)
at java.awt.Container.dispatchEventImpl(Container.java:2294)
at java.awt.Component.dispatchEvent(Component.java:4711)
at java.awt.LightweightDispatcher.retargetMouseEvent(Container.java:4888)
at java.awt.LightweightDispatcher.processMouseEvent(Container.java:4525)
at java.awt.LightweightDispatcher.dispatchEvent(Container.java:4466)
at java.awt.Container.dispatchEventImpl(Container.java:2280)
at java.awt.Window.dispatchEventImpl(Window.java:2746)
at java.awt.Component.dispatchEvent(Component.java:4711)
at java.awt.EventQueue.dispatchEventImpl(EventQueue.java:758)
at java.awt.EventQueue.access$500(EventQueue.java:97)
at java.awt.EventQueue$3.run(EventQueue.java:709)
at java.awt.EventQueue$3.run(EventQueue.java:703)
at java.security.AccessController.doPrivileged(Native Method)
at java.security.ProtectionDomain$JavaSecurityAccessImpl.doIntersectionPrivilege(ProtectionDomain.java:80)
at java.security.ProtectionDomain$JavaSecurityAccessImpl.doIntersectionPrivilege(ProtectionDomain.java:90)
at java.awt.EventQueue$4.run(EventQueue.java:731)
at java.awt.EventQueue$4.run(EventQueue.java:729)
at java.security.AccessController.doPrivileged(Native Method)
at java.security.ProtectionDomain$JavaSecurityAccessImpl.doIntersectionPrivilege(ProtectionDomain.java:80)
at java.awt.EventQueue.dispatchEvent(EventQueue.java:728)
at java.awt.EventDispatchThread.pumpOneEventForFilters(EventDispatchThread.java:201)
at java.awt.EventDispatchThread.pumpEventsForFilter(EventDispatchThread.java:116)
at java.awt.EventDispatchThread.pumpEventsForHierarchy(EventDispatchThread.java:105)
at java.awt.EventDispatchThread.pumpEvents(EventDispatchThread.java:101)
at java.awt.EventDispatchThread.pumpEvents(EventDispatchThread.java:93)
at java.awt.EventDispatchThread.run(EventDispatchThread.java:82)
Производитель
Это часть производителя, которую я добавил, и она не может создать производителя kafka с помощью KafkaAvroSerializer.
try {
//Configure connection properties
Properties producerConfig = createKafkaProducerProperties(environment);
//Create Kafka Producer
try (KafkaProducer<String, GenericRecord> myProducer = new KafkaProducer<>(producerConfig)) {
//Create ProducerRecord
Schema schema = new Schema.Parser().parse(new File("src/main/resources/investorOnboarding.avsc"));
GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema);
GenericData.Record genericRecord = genericRecordBuilder.build();
final ProducerRecord<String, GenericRecord> myProducerRecord = new ProducerRecord<>(topicName.getCode(), genericRecord);
//Send the message
myProducer.send(myProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null){
alertSuccessStatus("Message has been successfully sent to Kafka");
}
}
});
//Disconnect
myProducer.close();
}
} catch (Exception ex) {
logger.info(ex.getMessage());
}
}
private Properties createKafkaProducerProperties(Environment implementation) throws UnknownHostException, PasswordProviderException {
Properties producerConfig = new Properties();
producerConfig.put("client.id", InetAddress.getLocalHost().getHostName());
producerConfig.put("bootstrap.servers", implementation.getBootStrapServers());
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerConfig.put("schema.registry.url", "http://localhost:8085");
Добавлены банки