Приложение My Spring Integration использует сообщения из RabbitMQ, преобразует их в сообщение SOAP и выполняет запрос веб-службы.
Возможно получить много (10 - 50) сообщений в секунду из очереди.
Или после перезапуска приложения в очереди RabbitMQ может быть много тысяч сообщений.
Каков наилучший из возможных сценариев для обработки до 10 сообщений в параллельных потоках (упорядочение сообщений желательно иметь, но не обязательно, если веб-служба отвечает с ошибкой в работе, то сообщение с ошибкой следует повторить, пока оно не выполнится).
Прослушиватель Amqp не должен потреблять больше сообщений из очереди, поскольку в исполнителе задач доступны не занятые потоки.
Я мог бы определить ThreadExecutor в канале, как это:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue);
}
IntegrationFlow integrationFlow = IntegrationFlows
.from(amqpInboundChannelAdapter)
.channel(c -> c.executor(exportFlowsExecutor))
.transform(businessObjectToSoapRequestTransformer)
.handle(webServiceOutboundGatewayFactory.getObject())
.get();
Или достаточно определить исполнителя задачи в AmqpInboundChannelAdapter следующим образом и не определять исполнителя задачи канала в определении потока:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.taskExecutor(taskExecutor));
}
Или, может быть, определить исполнителя задачи для канала, подобного варианту 1, но дополнительно установить maxConcurrentConsumers на канальном адаптере следующим образом:
@Bean
public AmqpInboundChannelAdapterSMLCSpec
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue)
{
return Amqp.inboundAdapter(connectionFactory, queue)
.configureContainer(c->c.maxConcurrentConsumers(10));
}