Я пытаюсь передать sh данные, обработанные в процессоре Apache Camel, в класс слушателя. В экземпляре класса процессора я пытаюсь зарегистрировать слушателя во время создания экземпляра контекста Camel, что почему-то не удается. Возможно, я здесь в корне ошибаюсь и это невозможно. Если это так, было бы неплохо, если вы мне скажете.
У меня есть Apache Camel-маршрут, извлекающий JSON сообщения с сервера ActiveMQ и отправляющий эти JSON в пользовательский класс процессора, определенный в Camel -Spring XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="url to ActiveMQ" />
<property name="clientID" value="clientID" />
<property name="userName" value="theUser" />
<property name="password" value="thePassword" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="8" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="customProcessor" class="...CustomProcessorClass" />
<camelContext id="matrixProfileContext" xmlns="http://camel.apache.org/schema/spring">
<route id="matrixProfileRoute" autoStartup="false">
<from uri="activemq:queue:queuename" />
<log message="${body}" />
<to uri="customProcessor" />
</route>
</camelContext>
</beans>
Моя идея состоит в том, что класс CustomProcessor демаршалирует JSON контент, доставляемый по маршруту, и подталкивает POJO к классу слушателя, который реализует интерфейс слушателя:
public interface ProcessorListenerIF {
public void doOnDataProcessed(POJO processedData);
}
Я тестирую всю настройку с помощью unit-test:
public class TestProcessor extends TestCase {
@Test
public void testRoute() throws Exception {
MyActiveMQConnector camelContext = new MyActiveMQConnector(new TestListener());
try {
camelContext.startConnections();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
camelContext.stopConnection();
}
}
private class TestListener implements ProcessorListenerIF {
@Override
public void doOnDataProcessed(POJO data) {
System.out.println(data);
}
}
}
У процессора Camel есть два метода:
public void addListener(MatrixProfileProcessorListenerIF listener) {
_processorListeners.add(listener);
}
@Override
public void process(Exchange exchange) throws Exception {
Pseudocode: POJO data = unmarshal_by_JSON-JAVA(exchange)
_processorListeners.parallelStream().forEach(listener -> {
listener.doOnDataProcessed(data);
});
}
где я регистрирую слушателя в конструкторе ActiveMQConnector:
public class ActiveMQConnector {
private SpringCamelContext _camelContext = null;
public ActiveMQConnector(ProcessorListenerIF listener) {
ApplicationContext appContext = new ClassPathXmlApplicationContext("camelContext.xml");
_camelContext = new SpringCamelContext(appContext);
-------------------------------------
((CustomProcessor) _camelContext.getProcessor("customProcessor")).addListener(listener);
-------------------------------------
}
public void startConnections() throws Exception {
try {
_camelContext.start();
} catch (Exception e) {
exception handling
}
}
... more methods
Строка ((CustomProcessor) ..., выделенная выше, не работает: оператор _camelContext.getProcessor
ничего не находит, маршруты в экземпляре _camelContext
пусты.
Как Могу ли я реализовать в pu sh обработанные данные от процессора какому-нибудь наблюдателю?