DefaultMessageListenerContainer прекращает обработку сообщений - PullRequest
0 голосов
/ 03 февраля 2020

Я надеюсь, что это простая проблема конфигурации, но я не могу понять, что это может быть.

Настройка

  • Spring-Boor 2.2.2.RELEASE
  • облачный стартер
  • облачный стартер- aws
  • spring-jms
  • spring-cloud- зависимости Hoxton.SR1
  • amazon-sqs- java -messaging-lib 1.0.8

Проблема

Мое приложение запускается нормально и начинает обрабатывать сообщения от Amazon SQS. Через некоторое время я вижу следующее предупреждение

2020-02-01 04: 16: 21.482 LogLevel = WARN 1 --- [ecutor-thread14] osjlDefaultMessageListenerContainer: Количество запланированных потребителей упало ниже Ограничение concurrentConsumers, вероятно, из-за отклоненных задач. Проверьте конфигурацию пула потоков! Автоматическое восстановление c, которое будет инициировано оставшимися потребителями.

Вышеуказанное предупреждение будет напечатано несколько раз, и в итоге я вижу следующие два INFO сообщения

2020-02-01 04: 17: 51.552 LogLevel = INFO 1 --- [ecutor-thread40] c .asjavamessaging.SQSMessageConsumer: выключение исполнителя ConsumerPrefetch

2020-02-01 04:18 : 06.640 LogLevel = INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession: Выключение SessionCallBackScheduler executor

Вышеуказанные 2 сообщения будут отображаться несколько раз, а в какой-то момент нет больше сообщений потребляется из SQS. Я не вижу в своем журнале других сообщений, указывающих на проблему, но я не получаю сообщений от моих обработчиков о том, что они обрабатывают сообщения (у меня 2 ~), и я вижу, что очередь AWS SQS растет в количестве сообщения и возраст.

~: Этот точный код работал нормально, когда у меня был один обработчик, эта проблема началась, когда я добавил второй.

Конфигурация / Код

Первое "ПРЕДУПРЕЖДЕНИЕ", которое я понимаю, вызвано валютой ThreadPoolTaskExecutor , но я не могу получить конфигурацию, которая работает должным образом. Вот моя текущая конфигурация для JMS, я пробовал различные уровни максимального размера пула без какого-либо реального влияния, кроме ран, начинающихся рано или поздно, в зависимости от размера пула

    public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
        taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
        taskExecutor.setCorePoolSize(10);
        // Allow the thread pool to grow up to 4 times the core size, evidently not
        // having the pool be larger than the max concurrency causes the JMS queue
        // to barf on itself with messages like
        // "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
        taskExecutor.setMaxPoolSize(10 * 4);
        taskExecutor.setQueueCapacity(0); // do not queue up messages
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        return taskExecutor;
    }

Вот JMS Container Factory мы создаем

    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(sqsConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        // The JMS processor will start 'concurrency' number of tasks
        // and supposedly will increase this to the max of '10 * 3'
        factory.setConcurrency(10 + "-" + (10 * 3));
        factory.setTaskExecutor(asyncConsumerTaskExecutor);
        // Let the task process 100 messages, default appears to be 10
        factory.setMaxMessagesPerTask(100);
        // Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
        factory.setReceiveTimeout(5000L);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;
    }

Я добавил setMaxMessagesPerTask & setReceiveTimeout вызовы, основанные на вещах, найденных в inte rnet, проблема сохраняется без них и при различных настройках (50, 2500L, 25, 1000L, et c ...)

Мы создаем фабрику соединений SQS по умолчанию

    public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
        return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
    }

Наконец обработчики выглядят как это

    @JmsListener(destination = "consumer-event-queue")
    public void receiveEvents(String message) throws IOException {
        MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
        //messageTask.process(myEventDTO);
    }

    @JmsListener(destination = "myalert-sqs")
    public void receiveAlerts(String message) throws IOException, InterruptedException {
        final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
        myProcessor.addAlertToQueue(myAlert);
    }

Вы можете видеть в первой функции ( receiveEvents ), мы просто берем сообщение из очереди и завершаем работу, мы не реализовали код обработки для этого. Вторая функция ( receiveAlerts ) получает сообщение, функция myProcessor.addAlertToQueue создает работоспособный объект и передает s его в пул потоков для обработки в определенный момент в будущем.

Проблема только началась (предупреждение, информация и невозможность использования сообщений) началась только тогда, когда мы добавили функцию receiveAlerts , ранее другая функция была единственной присутствующей и мы не видели такого поведения.

Подробнее

Это часть более крупного проекта, и я работаю над разбиением этого кода на меньший тестовый пример, чтобы увидеть, Я могу продублировать эту проблему. Я опубликую продолжение с результатами.

В то же время

Я надеюсь, что это просто проблема конфигурации, и кто-то, кто более знаком с этим, может скажите мне, что я делаю неправильно, или что кто-то может высказать некоторые мысли и комментарии о том, как исправить это, чтобы работать должным образом.

Спасибо!

1 Ответ

0 голосов
/ 14 февраля 2020

После некоторой борьбы с этим, я думаю, что наконец-то решил его.

Проблема, по-видимому, связана с "DefaultJmsListenerContainerFactory", эта фабрика создает новый "DefaultJmsListenerContainer" для EACH метод с аннотацией @JmsListener. Человек, который первоначально написал код, думал, что он был вызван только один раз для приложения, и созданный контейнер будет использован повторно. Таким образом, проблема была двукратной

  1. В ThreadPoolTaskExecutor, прикрепленном к фабрике, было 40 потоков, когда у приложения был 1 метод @JmsListener, это работало нормально, но когда мы добавили второй метод, каждый метод получил 10 потоков (всего 20) для прослушивания. Это хорошо, однако; так как мы заявили, что каждый слушатель может вырасти до 30 слушателей, у нас быстро закончились потоки в пуле, упомянутом в 1 выше. Это вызвало ошибку « Количество запланированных потребителей ниже предела concurrentConsumers »
  2. Это, вероятно, очевидно, учитывая вышесказанное, но я хотел явно это вызвать. Однако на фабрике слушателей мы устанавливаем параллелизм равным 10-30; все слушатели должны разделить этот пул. Таким образом, максимальный параллелизм должен быть настроен так, чтобы максимальное значение каждого слушателя было достаточно маленьким, чтобы, если каждый слушатель создает свой максимум, он не превышал максимальное количество потоков в пуле (например, если у нас есть 2 '@JmsListener 'аннотированные методы и пул с 40 потоками, тогда максимальное значение может быть не более 20).

Надеюсь, это может помочь кому-то еще с подобной проблемой в будущем ....

...