У меня есть маршрут и загрузка процессора Apache Camel от брокера ActiveMQ.
Код маршрута -
@Component
public class MyRoute extends RouteBuilder {
private String mySubscription;
private MyProcessor myProcessor;
public MyRoute(@Value("${my.topic}") String mySubscription, MyProcessor myProcessor) {
this.mySubscription = mySubscription;
this.myProcessor = myProcessor;
}
@Override
public void configure() {
from(mySubscription)
.unmarshal().json(JsonLibrary.Jackson, MyDTO.class)
.bean(myProcessor, "process(${body})")
.end();
}
}
Код процессора -
@Slf4j
@Component
@AllArgsConstructor
public class MyProcessor {
public void process(MyDTO dto) {
//code that calls HTTP URLs
}
}
Конфигурация похожа наниже -
spring:
application:
name: my_listener
//Bean prefixes
pooledConnectionFactory:
maxConnections: 10
connectionFactory:
brokerURL: ${brokerURL}
redeliveryPolicy:
backOffMultiplier: 2.0
useExponentialBackOff: true
redeliveryDelay: 60000
maximumRedeliveries: 5
component:
forceSendOriginalMessage: true
concurrentConsumers: 15
//A bunch of HTTP URLs
brokerURL: <brokerURL>
При локальном запуске слушателя и указании VisualVM на локального посредника ActiveMQ я вижу 15 потоков с именем, содержащим имя подписки, на вкладке «Темы VisualVM».Если я отправил 4 сообщения, я вижу, как 4 разных потока переходят в рабочее состояние.Я не вижу ни одного потока, идентифицируемого как объект класса процессора, хотя на вкладке MBeans есть один компонент под процессорами.Вызов getTotalExchanges () для этого бина отображает 4 = нет отправленных сообщений.
Параметр concurrentConsumers (15 здесь) создает только 15 потоков для потребления?Продолжается ли обработка через класс процессора по-прежнему последовательно?Или каждый поток подписки вызывает объект процессора в своем собственном потоке, делая логику процессора многопоточным?