Я надеюсь, что это простая проблема конфигурации, но я не могу понять, что это может быть.
Настройка
- Spring-Boor 2.2.2.RELEASE
- облачный стартер
- облачный стартер- aws
- spring-jms
- spring-cloud- зависимости Hoxton.SR1
- amazon-sqs- java -messaging-lib 1.0.8
Проблема
Мое приложение запускается нормально и начинает обрабатывать сообщения от Amazon SQS. Через некоторое время я вижу следующее предупреждение
2020-02-01 04: 16: 21.482 LogLevel = WARN 1 --- [ecutor-thread14] osjlDefaultMessageListenerContainer: Количество запланированных потребителей упало ниже Ограничение concurrentConsumers, вероятно, из-за отклоненных задач. Проверьте конфигурацию пула потоков! Автоматическое восстановление c, которое будет инициировано оставшимися потребителями.
Вышеуказанное предупреждение будет напечатано несколько раз, и в итоге я вижу следующие два INFO сообщения
2020-02-01 04: 17: 51.552 LogLevel = INFO 1 --- [ecutor-thread40] c .asjavamessaging.SQSMessageConsumer: выключение исполнителя ConsumerPrefetch
2020-02-01 04:18 : 06.640 LogLevel = INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession: Выключение SessionCallBackScheduler executor
Вышеуказанные 2 сообщения будут отображаться несколько раз, а в какой-то момент нет больше сообщений потребляется из SQS. Я не вижу в своем журнале других сообщений, указывающих на проблему, но я не получаю сообщений от моих обработчиков о том, что они обрабатывают сообщения (у меня 2 ~), и я вижу, что очередь AWS SQS растет в количестве сообщения и возраст.
~: Этот точный код работал нормально, когда у меня был один обработчик, эта проблема началась, когда я добавил второй.
Конфигурация / Код
Первое "ПРЕДУПРЕЖДЕНИЕ", которое я понимаю, вызвано валютой ThreadPoolTaskExecutor , но я не могу получить конфигурацию, которая работает должным образом. Вот моя текущая конфигурация для JMS, я пробовал различные уровни максимального размера пула без какого-либо реального влияния, кроме ран, начинающихся рано или поздно, в зависимости от размера пула
public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
taskExecutor.setCorePoolSize(10);
// Allow the thread pool to grow up to 4 times the core size, evidently not
// having the pool be larger than the max concurrency causes the JMS queue
// to barf on itself with messages like
// "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
taskExecutor.setMaxPoolSize(10 * 4);
taskExecutor.setQueueCapacity(0); // do not queue up messages
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
return taskExecutor;
}
Вот JMS Container Factory мы создаем
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
// The JMS processor will start 'concurrency' number of tasks
// and supposedly will increase this to the max of '10 * 3'
factory.setConcurrency(10 + "-" + (10 * 3));
factory.setTaskExecutor(asyncConsumerTaskExecutor);
// Let the task process 100 messages, default appears to be 10
factory.setMaxMessagesPerTask(100);
// Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
factory.setReceiveTimeout(5000L);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
Я добавил setMaxMessagesPerTask & setReceiveTimeout вызовы, основанные на вещах, найденных в inte rnet, проблема сохраняется без них и при различных настройках (50, 2500L, 25, 1000L, et c ...)
Мы создаем фабрику соединений SQS по умолчанию
public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
}
Наконец обработчики выглядят как это
@JmsListener(destination = "consumer-event-queue")
public void receiveEvents(String message) throws IOException {
MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
//messageTask.process(myEventDTO);
}
@JmsListener(destination = "myalert-sqs")
public void receiveAlerts(String message) throws IOException, InterruptedException {
final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
myProcessor.addAlertToQueue(myAlert);
}
Вы можете видеть в первой функции ( receiveEvents ), мы просто берем сообщение из очереди и завершаем работу, мы не реализовали код обработки для этого. Вторая функция ( receiveAlerts ) получает сообщение, функция myProcessor.addAlertToQueue создает работоспособный объект и передает s его в пул потоков для обработки в определенный момент в будущем.
Проблема только началась (предупреждение, информация и невозможность использования сообщений) началась только тогда, когда мы добавили функцию receiveAlerts , ранее другая функция была единственной присутствующей и мы не видели такого поведения.
Подробнее
Это часть более крупного проекта, и я работаю над разбиением этого кода на меньший тестовый пример, чтобы увидеть, Я могу продублировать эту проблему. Я опубликую продолжение с результатами.
В то же время
Я надеюсь, что это просто проблема конфигурации, и кто-то, кто более знаком с этим, может скажите мне, что я делаю неправильно, или что кто-то может высказать некоторые мысли и комментарии о том, как исправить это, чтобы работать должным образом.
Спасибо!