Это можно сделать с помощью опции concurrency
ListenerContainer
:
Потоки из TaskExecutor
, настроенные в SimpleMessageListenerContainer
, используются для вызова MessageListener
, когдаНовое сообщение доставлено Клиентом RabbitMQ.Если не настроено, используется SimpleAsyncTaskExecutor
.Если используется объединенный исполнитель, убедитесь, что размер пула достаточен для обработки настроенного параллелизма.С DirectMessageListenerContainer
MessageListener
вызывается непосредственно в потоке клиента RabbitMQ.В этом случае taskExecutor
используется для задачи, которая контролирует потребителей.
Пожалуйста, начните читать отсюда: https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#receiving-messages
А также см. Здесь: https://docs.spring.io/spring-amqp/docs/current/reference/html/_reference.html#containerAttributes
concurrentConsumers (параллелизм) - количество одновременных потребителей, которые первоначально запускаются для каждого слушателя.
ОБНОВЛЕНИЕ
Хорошо!Я вижу, что происходит.
У нас есть такой код:
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
, поэтому мы проверяем возможный параллелизм только после обработки первого сообщения.Итак, в вашем случае это произойдет через 1 минуту.
Еще один флаг для considerAddingAConsumer()
- это вариант consecutiveActiveTrigger
со следующим по умолчанию:
private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10;
Итак,в вашем случае, чтобы разрешить параллелизацию только следующего сообщения, вы также должны настроить:
/**
* If {@link #maxConcurrentConsumers} is greater then {@link #concurrentConsumers}, and
* {@link #maxConcurrentConsumers} has not been reached, specifies the number of
* consecutive cycles when a single consumer was active, in order to consider
* starting a new consumer. If the consumer goes idle for one cycle, the counter is reset.
* This is impacted by the {@link #txSize}.
* Default is 10 consecutive messages.
* @param consecutiveActiveTrigger The number of consecutive receives to trigger a new consumer.
* @see #setMaxConcurrentConsumers(int)
* @see #setStartConsumerMinInterval(long)
* @see #setTxSize(int)
*/
public final void setConsecutiveActiveTrigger(int consecutiveActiveTrigger) {
Assert.isTrue(consecutiveActiveTrigger > 0, "'consecutiveActiveTrigger' must be > 0");
this.consecutiveActiveTrigger = consecutiveActiveTrigger;
}
на 1
.Потому что 0
не будет работать в любом случае.
Для лучшей производительности вы также можете рассмотреть возможность сделать subscribeToRequestQueue()
с @Async
, чтобы действительно передать обработку из потока потребителя другому, чтобы избежатьэто 1 минута, чтобы ждать еще одного потребителя, чтобы начать.