Все эти основанные на очереди каналы в Spring Integration хранят сообщения в памяти только по умолчанию.Когда требуется постоянство, вы можете либо предоставить атрибут 'message-store
' в элементе 'queue
' для ссылки на постоянную реализацию MessageStore, либо вы можете заменить локальный канал тем, который поддерживается постоянным посредником, таким какканал с поддержкой JMS или канальный адаптер.Последний вариант позволяет использовать преимущества любого провайдера JMS для сохранения сообщений.
Можно настроить хранилище сообщений для любого QueueChannel
, добавив атрибут хранилища сообщений, как показано ниже.
Spring Integration обеспечивает поддержку шаблона хранилища сообщений путем а) определения интерфейса стратегии org.springframework.integration.store.MessageStore
, б) предоставления нескольких реализаций этого интерфейса и в) предоставления атрибута хранилища сообщений на всех компонентах, которые имеют возможность буферизовать сообщения таким образомчто вы можете внедрить любой экземпляр, который реализует интерфейс MessageStore.
Мой пример использует JDBC Message Store , но есть также несколько других доступных опций.
<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${my.jdbc.driver}"/>
<property name="url" value="${my.jdbc.url}"/>
<property name="username" value="${my.jdbc.username}"/>
<property name="password" value="${my.jdbc.password}"/>
<property name="minEvictableIdleTimeMillis" value="300000"/>
<property name="timeBetweenEvictionRunsMillis" value="60000"/>
<property name="connectionProperties" value="SetBigStringTryClob=true;"/>
</bean>
<!-- The poller is needed by any of the QueueChannels -->
<integration:poller id="myPoller" fixed-rate="5000" default="true"/>
<!-- The MessageStore is needed to persist messages used by any of the QueueChannels -->
<int-jdbc:message-store id="myMessageStore" data-source="myDataSource" table-prefix="MY_INT_"/>
<!-- Main entry point into the process -->
<integration:gateway id="myGateway"
service-interface="com.mycompany.myproject.integration.gateways.myGateway"
default-request-channel="myGatewayChannel"
/>
<!-- Map the initial input channel to the first step, MyFirstService -->
<integration:channel id="myGatewayChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>
<!-- Step 1: My First Service -->
<integration:service-activator
id="myFirstServiceActivator"
input-channel="myGatewayChannel"
output-channel="myNextChannel"
ref="myFirstService"
method="process"/>
<!-- LONG running process. Setup asynchronous queue channel. -->
<integration:channel id="myNextChannel">
<integration:queue capacity="1000"/>
<integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>