Spring Integration: TaskExecutor и MaxConcurrentConsumers в AmqpInboundChannelAdapter - PullRequest
0 голосов
/ 28 августа 2018

Приложение My Spring Integration использует сообщения из RabbitMQ, преобразует их в сообщение SOAP и выполняет запрос веб-службы.

Возможно получить много (10 - 50) сообщений в секунду из очереди. Или после перезапуска приложения в очереди RabbitMQ может быть много тысяч сообщений.

Каков наилучший из возможных сценариев для обработки до 10 сообщений в параллельных потоках (упорядочение сообщений желательно иметь, но не обязательно, если веб-служба отвечает с ошибкой в ​​работе, то сообщение с ошибкой следует повторить, пока оно не выполнится).

Прослушиватель Amqp не должен потреблять больше сообщений из очереди, поскольку в исполнителе задач доступны не занятые потоки. Я мог бы определить ThreadExecutor в канале, как это:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

Или достаточно определить исполнителя задачи в AmqpInboundChannelAdapter следующим образом и не определять исполнителя задачи канала в определении потока:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

Или, может быть, определить исполнителя задачи для канала, подобного варианту 1, но дополнительно установить maxConcurrentConsumers на канальном адаптере следующим образом:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}

1 Ответ

0 голосов
/ 28 августа 2018

Рекомендуется настроить concurrency на ListenerContainer и разрешить всем последующим процессам происходить в этих потоках из контейнера. Таким образом, вы получите естественное обратное давление, когда больше сообщений не будет опрошено из очереди, потому что поток занят. А с другой стороны, не будет потери сообщений только потому, что с ExecutorChannel после контейнера слушателя вы собираетесь освободить поток опроса, и текущее сообщение будет подтверждено как использованное, но вы можете потерпеть неудачу в нисходящем направлении.

...