Отправка сообщений в асинхронный сервис с транзакциями - PullRequest
1 голос
/ 17 февраля 2012

У меня есть постоянная транзакционная очередь, содержащая сообщения, которые мне нужно отправить по асинхронному протоколу.Каждое сообщение должно быть отправлено в отдельной транзакции, но количество сообщений в рейсе в данный момент времени не позволяет использовать thread-per-message , в то время как требования к пропускной способности не позволяют сохранять промежуточные состояния.

Глядя на код для JmsTransactionManager, я вижу, что он использует TransactionSynchronizationManager, который хранит ресурсы транзакции в ThreadLocal.Поэтому мне кажется, что мне нужно будет реализовать PlatformTransactionManager, чтобы каким-то образом обрабатывать несколько транзакций в одном потоке.Это кажется немного экстремальным ...

Существует ли какое-то расположение модулей Spring Integration, которое устранит эту сложность?Стоит ли искать информацию JTA / XA?

1 Ответ

1 голос
/ 08 марта 2012

Все эти основанные на очереди каналы в Spring Integration хранят сообщения в памяти только по умолчанию.Когда требуется постоянство, вы можете либо предоставить атрибут 'message-store' в элементе 'queue' для ссылки на постоянную реализацию MessageStore, либо вы можете заменить локальный канал тем, который поддерживается постоянным посредником, таким какканал с поддержкой JMS или канальный адаптер.Последний вариант позволяет использовать преимущества любого провайдера JMS для сохранения сообщений.

Можно настроить хранилище сообщений для любого QueueChannel, добавив атрибут хранилища сообщений, как показано ниже.

Spring Integration обеспечивает поддержку шаблона хранилища сообщений путем а) ​​определения интерфейса стратегии org.springframework.integration.store.MessageStore, б) предоставления нескольких реализаций этого интерфейса и в) предоставления атрибута хранилища сообщений на всех компонентах, которые имеют возможность буферизовать сообщения таким образомчто вы можете внедрить любой экземпляр, который реализует интерфейс MessageStore.

Мой пример использует JDBC Message Store , но есть также несколько других доступных опций.

<bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="${my.jdbc.driver}"/>
    <property name="url" value="${my.jdbc.url}"/>
    <property name="username" value="${my.jdbc.username}"/>
    <property name="password" value="${my.jdbc.password}"/>
    <property name="minEvictableIdleTimeMillis" value="300000"/>
    <property name="timeBetweenEvictionRunsMillis" value="60000"/>
    <property name="connectionProperties" value="SetBigStringTryClob=true;"/>
</bean>

<!-- The poller is needed by any of the QueueChannels -->
<integration:poller id="myPoller" fixed-rate="5000" default="true"/>

<!-- The MessageStore is needed to persist messages used by any of the QueueChannels -->
<int-jdbc:message-store id="myMessageStore" data-source="myDataSource" table-prefix="MY_INT_"/>

<!-- Main entry point into the process -->
<integration:gateway id="myGateway"
                     service-interface="com.mycompany.myproject.integration.gateways.myGateway"
                     default-request-channel="myGatewayChannel"
        />

<!-- Map the initial input channel to the first step, MyFirstService -->
<integration:channel id="myGatewayChannel">
    <integration:queue capacity="1000"/>
    <integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>

<!-- Step 1: My First Service -->
<integration:service-activator
        id="myFirstServiceActivator"
        input-channel="myGatewayChannel"
        output-channel="myNextChannel"
        ref="myFirstService"
        method="process"/>    

<!-- LONG running process. Setup asynchronous queue channel. -->
<integration:channel id="myNextChannel">
    <integration:queue capacity="1000"/>
    <integration:queue message-store="myMessageStore" capacity="1000"/>
</integration:channel>    
...