Оценить ограничение сообщений с помощью интеграции Spring - PullRequest
0 голосов
/ 26 июня 2018

Я новичок в Spring-интеграции.

У нас есть приложение REST, которое получает слишком много сообщений (6000 сообщений в минуту), чем то, что может обрабатывать база данных. Поэтому я хочу ограничить количество запросов до 500 сообщений в 15 секунд (2000 в минуту). Для этого я использовал канал очереди.

Приложение создает более 30000 потоков Java через некоторое время. Кроме того, канал очереди содержит больше сообщений, чем указано в емкости очереди.

Как уменьшить количество потоков и ограничить количество сообщений в очереди?

Контекст интеграции xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd">

    <!-- Endpoint -->   
    <int:gateway service-interface="com.ratelimiter.PrintGateway" default-request-channel="inputChannel">
        <int:method name="print"/>  
    </int:gateway>

    <!-- Channel -->
    <int:channel id="inputChannel">
        <int:queue capacity="30000"/>
    </int:channel>

    <!-- Endpoint -->   
    <int:service-activator ref="receiver" input-channel="inputChannel" method="save">
        <int:poller fixed-rate="15" time-unit="SECONDS" max-messages-per-poll="500"></int:poller>
    </int:service-activator>

    <!--  Spring Bean -->
    <bean id="receiver" class="com.ratelimiter.saveToDataStore"/>

</beans>

Интерфейс PrintGateway:

public interface PrintGateway {

    public Future<Message<String>> print(Message<?> message);
}

1 Ответ

0 голосов
/ 26 июня 2018

Поскольку ваша подпись шлюза должна возвращать Future<Message<String>>, это рассматривается как асинхронный шлюз: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#async-gateway

По умолчанию используется

private volatile AsyncTaskExecutor asyncExecutor = new SimpleAsyncTaskExecutor();

Который действительно раскручивает новую ветку для каждого нового сообщения. И что важно: он ждет ответа, чтобы выполнить это Future. Согласно вашему коду никакого ответа не будет, поэтому ваши потоки в шлюзе очень долго ничего не ждут.

Вам следует рассмотреть возможность изменения подписи вашего шлюза на тип возврата void. Таким образом, вы действительно отправите и забудете. Не будет никаких фоновых дополнительных тем ни за что.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...