Медленное потребление сообщений с помощью AmazonSQSClient - PullRequest
0 голосов
/ 11 декабря 2018

Итак, я использовал параллелизм весной jms 50-100, разрешив максимальное количество подключений до 200. Все работает, как и ожидалось, но если я пытаюсь получить 100k сообщений из очереди, я имею в виду, что на моих sqs есть 100k сообщений, и я их читаючерез пружину jms нормальный подход.

@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
 }

Я вижу все журналы в моей консоли, но после примерно 17k он начинает выдавать исключения

Что-то вроде: aws sdk exception: port уже используется.

Почему я вижу это исключение и как это сделать.Я от этого избавляюсь?

Я пытался поискать это в интернете.Ничего не удалось найти.

Моя настройка:

Параллелизм 50-100

Задать сообщения для задачи: 50

Клиент подтвердил

timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect

Обновление: я искал проблему, и кажется, что новые сокеты создаются, пока не будут исчерпаны все сокеты.

Моя весенняя версия jms будет 4.3.10

. Чтобы повторить эту проблему, просто выполните описанную выше конфигурацию с максимальным соединением 200 и валютой 50-100 и отправьте несколько сообщений по 40 КБ вsqs queue .. https://github.com/adamw/elasticmq можно использовать в качестве локального стекового сервера, который реплицирует Amazon sqs.Прокомментируйте jms listener и используйте нагрузочное тестирование soap ui и вызовите send message, чтобы запустить много сообщений.Только потому, что вы прокомментировали аннотацию @jmslistener, она не будет принимать сообщения из очереди.Как только вы увидите, что отправили 40 тыс. Сообщений, остановитесь.Раскомментируйте @jmslistener и перезапустите сервер.

Обновление:

DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setErrorHandler(Throwable::printStackTrace);
        factory.setConcurrency("50-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;

Обновление:

SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);

Обновление:

Сведения о конфигурации клиента:

Protocol : HTTP
Max connections : 200

Обновление:

Я использовал класс фабрики соединений с кешем и похоже.Я прочитал о переполнении стека и в их официальной документации, чтобы не использовать класс фабрики соединений с кешем и фабрику контейнеров слушателя jms по умолчанию.

https://stackoverflow.com/a/21989895/5871514

Это выдает ту же ошибку, что и раньше.

обновление

Моя цель - получить 500 т / с, то есть я должен иметь возможность потреблять так много .. Итак, я попробовал этот метод, и кажется, что я могу достичь100-200, но не более того ... Плюс эта штука является блокирующим при высоком параллелизме .. Если вы используете это .. Если у вас есть какое-то лучшее решение для достижения этого ... Я весь слух.

** обновлено **

Я использую amazonsqsclient

1 Ответ

0 голосов
/ 21 декабря 2018

Истощение на потребителе

Одной из возможных оптимизаций, которую стремятся реализовать клиенты JMS, является буфер потребления сообщений или «предварительная выборка».Этот буфер иногда настраивается по количеству сообщений или по размеру буфера в байтах.

Цель состоит в том, чтобы предотвратить переход потребителя на сервер каждый раз, когда он получает сообщения, вместо того, чтобы извлекать несколько сообщений в пакете.

В среде, где у вас много "быстрых"потребителям »(что является мнением, которое могут принимать эти библиотеки), эта предварительная выборка установлена ​​на несколько высокий уровень по умолчанию, чтобы минимизировать эти циклы.

Однако в среде с медленнойДля потребителей сообщений такая предварительная выборка может стать проблемой.Медленный потребитель задерживает потребление сообщений для тех предварительно выбранных сообщений от более быстрого потребителя.В среде с высокой степенью одновременности это может быстро вызвать голодание.

В этом случае SQSConnectionFactory имеет свойство для этого :

SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);

Голодание на производителе (т. Е. Через JmsTemplate)

Для этих реализаций JMS очень характерно ожидать взаимодействия с брокером через какого-либо посредника.Эти посредники фактически кешируют и повторно используют соединения или используют механизм объединения для их повторного использования.В мире Java EE об этом обычно заботятся об адаптере JCA или другом методе на сервере Java EE.

Из-за того, как работает Spring JMS, он ожидает, что существует промежуточный делегат для ConnectionFactoryсделать это кэширование / пул.В противном случае, когда Spring JMS хочет подключиться к брокеру, он попытается открыть новое подключение и сеанс (!) каждый раз, когда вы захотите что-то сделать с брокером.

РешитьЭто, Spring предоставляет несколько вариантов.Самым простым из них является CachingConnectionFactory, который кэширует один Connection и позволяет открывать множество Session с этим Connection.Простой способ добавить это к вашему @Configuration выше будет выглядеть примерно так:

@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {

    SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);

    // Doing the following is key!
    CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
    connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
    // Set the #connectionfactory properties to your liking here...

    return connectionFactory;

}

Если вы хотите что-то более изящное в качестве решения для JMS-пула (которое объединит Connections и MessageProducer s дляВы, в дополнение к нескольким Session s), можете использовать разумно новый проект PooledJMS * JmsPoolConnectionFactory или т.п. из их библиотеки.

...