Я новичок в kafka, springboot и пытаюсь интегрировать kafka и эластичный поиск в мое приложение springboot.
Когда я пытаюсь запустить приложение springboot, я вижу ошибку ниже:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory]: Factory method 'kafkaListenerContainerFactory' threw exception; nested exception is java.lang.NoSuchMethodError:org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
My pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.1.0</version>
</dependency>
Application.yml
security:
enabled: true
spring:
resources:
chain:
enabled: true
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group-id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
Класс производителя
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String topic = "users";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(History t){
logger.info("Inside send message to topic");
this.kafkaTemplate.send(topic,"HelloWorld");
}
}
Consumer.java
package com.springboot.kafka;
import com.springboot.model.History;
import com.springboot.repository.HistoryRepository;
import org.apache.kafka.common.protocol.types.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private static final String topic = "users";
@KafkaListener(topics = topic,groupId = "group-id")
public void consume (String t){
logger.info("Message read as " + t);
}
}
Приложение.properties:
logging.level.sql=info
logging.file = /var/tmp/SpringBootAppLog.log
spring.datasource.driver=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/test
spring.datasource.username=postgres
spring.datasource.password=postgres
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.properties.hibernate.show_sql=true
spring.jpa.hibernate.ddl-auto=update
spring.data.elasticsearch.cluster-name=my-application
spring.data.elasticsearch.cluster-nodes=localhost:9200
Любая идея о том, что мне не хватает. Любые выводы будут высоко оценены.