У нас есть верблюжий маршрут, который читает тему, а затем выполняет некоторые преобразования для полученного сообщения.
Учитывая длительную тему, за один раз обрабатывается только одно сообщение, пока маршрут не завершит свою работу.
Для достижения параллелизма добавлен пул потоков, поэтому после получения сообщения из темы для дальнейшей асинхронной работы потоки создаются, но последовательно.Например, как только сообщение получено, поток выбирается из пула и начинает обработку, пока этот поток не завершит обработку, следующее сообщение не будет получено.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:broker="http://activemq.apache.org/schema/core" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="threadPool" class="java.util.concurrent.Executors" factory-method="newFixedThreadPool">
<constructor-arg index="0" value="10"/>
</bean>
<camel:camelContext id="camel-etl" trace="true"
xmlns="http://camel.apache.org/schema/spring">
<route id="topicRoute" errorHandlerRef="deadLetterErrorHandler" >
<from uri="{{inbound.topic}}"/>
<camel:threads executorServiceRef="threadPool">
<choice>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<when>
...
...
<multicast>
some loigc ...
</multicast>
<bean ref="persistData"/>
</when>
<otherwise>
...
<bean ref="deadLetterErrorHandler"/>
</otherwise>
</choice>
</camel:threads>
</route>
</camel:camelContext>
<!-- XSLT config -->
<bean id="saxonFactory" class="net.sf.saxon.TransformerFactoryImpl"/>
<!-- custom component beans -->
<bean id="persistData" class="com.data.PersistBean"/>
</beans>