Я создал приложение для чтения сообщений с Apache qpid и отправки их на Apache kafka. Я использую Camel со стартером Springboot. Мой Pom выглядит так -
<dependencyManagement>
<dependencies>
<!-- Camel BOM -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>${camel.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- ... other BOMs or dependencies ... -->
</dependencies>
</dependencyManagement>
<dependencies>
<!-- starters -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-amqp-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
</dependency>
<!-- other camel dependencies -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.0.RELEASE</version>
</plugin>
</plugins>
</build>
, а класс приложения Spring -
@SpringBootApplication
public class CamelSpringJmsKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(CamelSpringJmsKafkaApplication.class, args);
}
@Bean
public JmsConnectionFactory jmsConnectionFactory(@Value("${qpidUser}") String qpidUser, @Value("${qpidPassword}") String qpidPassword, @Value("${qpidBrokerUrl}") String qpidBrokerUrl) {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(qpidPassword, qpidPassword, qpidBrokerUrl);
return jmsConnectionFactory;
}
@Bean
@Primary
public CachingConnectionFactory jmsCachingConnectionFactory(JmsConnectionFactory jmsConnectionFactory) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory);
return cachingConnectionFactory;
}
application.properties -
camel.springboot.main-run-controller = true
camel.component.amqp.enabled = true
camel.component.amqp.connection-factory = jmsCachingConnectionFactory
camel.component.amqp.async-consumer = true
camel.component.amqp.concurrent-consumers = 1
camel.component.amqp.map-jms-message = true
camel.component.amqp.test-connection-on-startup = true
camel.component.kafka.brokers = localhost:9092
qpidBrokerUrl = amqp://localhost:5672?jms.username=guest&jms.password=guest&jms.clientID=clientid2&amqp.vhost=default
qpidUser = guest
qpidPassword = guest
RouteBuilder -
@Component
public class QpidToKafkaRoute extends RouteBuilder {
public void configure() throws Exception {
from("amqp:queue:test")
.log("Received key : ${header.JMSMessageID}, message : ${body}")
.setHeader(KafkaConstants.KEY, header("JMSMessageID"))
.to("kafka:camel")
.log("Sent key : ${headers[kafka.KEY]}, message : ${body}");
}
}
Когда я запускаю это приложение, оно выдает следующее исключение -
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of null
at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:125) ~[camel-base-3.4.0.jar:3.4.0]
Caused by: java.lang.IllegalArgumentException: connectionFactory must be specified
at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:152) ~[camel-util-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.createConnectionFactory(JmsConfiguration.java:1629) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.getOrCreateConnectionFactory(JmsConfiguration.java:773) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.createListenerConnectionFactory(JmsConfiguration.java:1638) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.getOrCreateListenerConnectionFactory(JmsConfiguration.java:816) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.configureMessageListenerContainer(JmsConfiguration.java:1468) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsConfiguration.createMessageListenerContainer(JmsConfiguration.java:725) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsEndpoint.createMessageListenerContainer(JmsEndpoint.java:189) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsEndpoint.createConsumer(JmsEndpoint.java:184) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.component.jms.JmsEndpoint.createConsumer(JmsEndpoint.java:73) ~[camel-jms-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultRoute.addServices(DefaultRoute.java:560) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.DefaultRoute.onStartingServices(DefaultRoute.java:166) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:153) ~[camel-base-3.4.0.jar:3.4.0]
at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:123) ~[camel-base-3.4.0.jar:3.4.0]
Не могли бы вы подсказать, почему во время автонастройки Springboot не находит connectionFactory? Когда я отлаживаю этот код, я вижу, что создается bean-компонент connectionFactory. Я даже могу увидеть еще одну строку журнала -
CamelContext has only been running for less than a second. If you intend to run Camel for a longer time then you can set the property camel.springboot.main-run-controller=true in application.properties or add spring-boot-starter-web JAR to the classpath.
, однако, если вы видите мой файл application.properties, обязательное свойство присутствует в самом начале.
Еще один журнал, я вижу в начале запуска приложения -
[main] trationDelegate$BeanPostProcessorChecker : Bean 'org.apache.camel.spring.boot.CamelAutoConfiguration' of type [org.apache.camel.spring.boot.CamelAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
Примечание.Один интересный факт: вчера вечером точно такой же код работал нормально, мой рабочий стол только что перезапустил, и ни одного слова не изменилось, и теперь он выдает исключение . Код также можно увидеть здесь - https://github.com/prashantbhardwaj/qpid-to-kafka-using-camel