Регистрация слушателей процессора Camel Apache - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь передать sh данные, обработанные в процессоре Apache Camel, в класс слушателя. В экземпляре класса процессора я пытаюсь зарегистрировать слушателя во время создания экземпляра контекста Camel, что почему-то не удается. Возможно, я здесь в корне ошибаюсь и это невозможно. Если это так, было бы неплохо, если вы мне скажете.

У меня есть Apache Camel-маршрут, извлекающий JSON сообщения с сервера ActiveMQ и отправляющий эти JSON в пользовательский класс процессора, определенный в Camel -Spring XML:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring
    http://camel.apache.org/schema/spring/camel-spring.xsd">

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="url to ActiveMQ" />
        <property name="clientID" value="clientID" />
        <property name="userName" value="theUser" />
        <property name="password" value="thePassword" />
    </bean>
    <bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
    </bean>
    <bean id="customProcessor" class="...CustomProcessorClass" />
    <camelContext id="matrixProfileContext" xmlns="http://camel.apache.org/schema/spring">  
        <route id="matrixProfileRoute" autoStartup="false">
            <from uri="activemq:queue:queuename" />
            <log message="${body}" />
            <to uri="customProcessor" />
        </route>
    </camelContext>
</beans>

Моя идея состоит в том, что класс CustomProcessor демаршалирует JSON контент, доставляемый по маршруту, и подталкивает POJO к классу слушателя, который реализует интерфейс слушателя:

public interface ProcessorListenerIF {

    public void doOnDataProcessed(POJO processedData);
}

Я тестирую всю настройку с помощью unit-test:

public class TestProcessor extends TestCase {

    @Test
    public void testRoute() throws Exception {
        MyActiveMQConnector camelContext = new MyActiveMQConnector(new TestListener());
        try {
            camelContext.startConnections();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            camelContext.stopConnection();
        }
    }

    private class TestListener implements ProcessorListenerIF {

        @Override
        public void doOnDataProcessed(POJO data) {
            System.out.println(data);
        }
    }
}

У процессора Camel есть два метода:

public void addListener(MatrixProfileProcessorListenerIF listener) {
    _processorListeners.add(listener);
}

@Override
public void process(Exchange exchange) throws Exception {

    Pseudocode: POJO data = unmarshal_by_JSON-JAVA(exchange)

    _processorListeners.parallelStream().forEach(listener -> {
        listener.doOnDataProcessed(data);
    });
}

где я регистрирую слушателя в конструкторе ActiveMQConnector:

public class ActiveMQConnector {

    private SpringCamelContext _camelContext = null;

    public ActiveMQConnector(ProcessorListenerIF listener) {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("camelContext.xml");
        _camelContext = new SpringCamelContext(appContext);
-------------------------------------
        ((CustomProcessor) _camelContext.getProcessor("customProcessor")).addListener(listener);
-------------------------------------
    }

    public void startConnections() throws Exception {
        try {
            _camelContext.start();
        } catch (Exception e) {
            exception handling
        }
    }
... more methods

Строка ((CustomProcessor) ..., выделенная выше, не работает: оператор _camelContext.getProcessor ничего не находит, маршруты в экземпляре _camelContext пусты.

Как Могу ли я реализовать в pu sh обработанные данные от процессора какому-нибудь наблюдателю?

1 Ответ

0 голосов
/ 07 мая 2020

Я нашел другое решение, основанное исключительно на Java без Spring XML.

Модульный тест по-прежнему выглядит так, как указано выше. Вместо определения ActiveMQEndpoint через Spring XML я создал новый класс:

public class MyActiveMQConnection {

    public static ActiveMQConnectionFactory createActiveMQConnectionFactory() {

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://<activemq-url>:<port>");
        connectionFactory.setUserName("myUsername");
        // connection factory configuration:
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID(UUID.randomUUID().toString());
        connectionFactory.setConnectResponseTimeout(300);
        ... whatever ...

        return connectionFactory;
    }
}

Кроме того, я изменил конструктор класса ActiveMQConnector:

public ActiveMQConnector(ProcessorListenerIF listener) throws Exception {

    _camelContext = new DefaultCamelContext();
    _camelContext.addComponent("activemqEndpoint",
            JmsComponent.jmsComponent(MyActiveMQConnection.createActiveMQConnectionFactory()));
    _camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {

            MyCustomProcessor processor = MyCustomProcessor.getInstance();
            processor.addListener(listener);

            from("activemqEndpoint:queue:matrixprofile") //
                    .process(processor) //
                    .to("stream:out");
        }
    });
}

Я реализовал процессор как синглтон выглядит так (для полноты):

public class MyCustomProcessor implements Processor {

    private final Set<ProcessorListenerIF> _processorListeners = new HashSet<>();
    private static volatile MyCustomProcessor      _myInstance         = null;
    private static Object                          _token              = new Object();

    private MyCustomProcessor() {

    }

    public static MyCustomProcessor getInstance() {
        MyCustomProcessor result = _myInstance;
        if (result == null) {
            synchronized (_token) {
                result = _myInstance;
                if (result == null)
                    _myInstance = result = new MyCustomProcessor();
            }
        }
        return result;
    }

    public void addListener(ProcessorListenerIF listener) {
        _processorListeners.add(listener);
    }

    /**
     * I assume the JSON has the following structure:
     * {timestamp: long, data: double[]}
    **/
    @Override
    public void process(Exchange exchange) throws Exception {

        _processorListeners.parallelStream().forEach(listener -> {
            // convert incoming message body to json object assuming data structure above
            JSONObject    jsonObject    = new JSONObject(exchange.getMessage().getBody().toString());
            MyPOJO myPojo = new MyPOJO();

            try {
                myPojo.setTimestamp(jsonObject.getLong("timestamp"));
            } catch (Exception e) {
                ...
            }
            try {
                JSONArray dataArray = jsonObject.getJSONArray("data");
                double[]  data      = new double[dataArray.length()];
                for (int i = 0; i < dataArray.length(); i++) {
                    data[i] = Double.valueOf(dataArray.get(i).toString());
                }
                myPojo.setData(data);
            } catch (Exception e) {
                ...
            }

            listener.doOnDataProcessed(myPojo);
        });
    }
}
...