Мой код работает правильно, если я отправляю строку, но при попытке использовать JSON получаю сообщение об ошибке.
Файл конфигурации:
@Configuration
public class KakfaConfiguration {
@Bean
public ProducerFactory<String, PriceRangeModel> produceConofig() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, PriceRangeModel>(configs);
}
@Bean
public KafkaTemplate<String, PriceRangeModel> kafkaTemplate(){
return new KafkaTemplate<String, PriceRangeModel>(produceConofig());
}
}
Контроллер Kafka:
@Controller
public class KafkaController {
@Autowired
private KafkaTemplate<String, PriceRangeModel> kafkaTemplate;
public void publish() {
System.out.println("In publish method !!!");
kafkaTemplate.send("kafkaTopic", new PriceRangeModel(100, 90, 100, 115));
System.out.println("Published !!!!!");
}
}
Журнал ошибок:
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:489)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:404)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:391)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:463)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:401)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216)
at com.example.demo.KafkaController.publish(KafkaController.java:15)
at com.example.demo.JsonProducerApplication.main(JsonProducerApplication.java:14)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:321)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:370)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
... 9 more
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 16 more
Пом. xml
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>