Опрос потоков JDBC: входящий канал-адаптер отката транзакции, но MQ сообщение отправлено - PullRequest
0 голосов
/ 10 ноября 2019

Я опрашиваю 50 записей БД , используя jdbc: адаптер входящего канала как транзакционный . Это означает, что все 50 записей будут обработаны в одной (той же) транзакции . И я хочу, чтобы они обрабатывались в одной и той же транзакции.

В примере кода ProcessIndividualRecord :: updateDB () обновляет статус в БД, а после этого messagesJMSChannel отправляет сообщение в MQ.

Если в метод updateDB происходит обработка, происходит исключение *1020* при обработке 50-й записи , то инфраструктура выполняет откат всех ранее обновленных 49 записей и управление переходит к ProcessorExceptionHandler с 50-й записью .

Моя задача заключается в том, что " messagesJMSChannel " уже отправлено предыдущим 49записи в MQ.

Как их откатить?

<int-jdbc:inbound-channel-adapter
    id="initial.poller"
    query="${poller.get}"
    update="${poller.update}"
    max-rows="${poller.maxRow:50}"
    row-mapper="pollerRowMapper"
    data-source="dataSource" channel="deliveryContactTypeChannel">
        <int:transactional/>
</int-jdbc:inbound-channel-adapter>

<int:channel id="deliveryContactTypeChannel" />

<int:splitter id="splitDeliveryContactType"
    ref="deliveryContactTypeMessageProcessor" method="handleJdbcMessage"
    input-channel="deliveryContactTypeChannel"
    output-channel="processIndividualRecordChannel" />

<int:service-activator id="statusUpdate"
    input-channel="processIndividualRecordChannel"
    output-channel="notificationJMSChannel"
    ref="processIndividualRecord" method="updateDB" />

<!-- send MQ message -->
<int:channel id="notificationJMSChannel"></int:channel>

<int-jms:outbound-channel-adapter
    id="jmsOut" channel="notificationJMSChannel" 
    destination="senderTopic" jms-template="jmsQueueTemplate">
</int-jms:outbound-channel-adapter>

<!-- Error handling -->
<int:channel id="jmsErrorChannel" />

<int:header-enricher id="errorMsg.HeaderEnricher"
    input-channel="errorChannel"
    output-channel="jmsErrorChannel">
    <int:header name="failpayload" expression="payload.failedMessage" />
</int:header-enricher>

<int:service-activator
    input-channel="jmsErrorChannel" method="handleException">
    <bean class="com.digital.alerts.integration.ProcessorExceptionHandler" />
</int:service-activator>

1 Ответ

1 голос
/ 10 ноября 2019

Вам следует рассмотреть возможность использования JtaTransactionManager, если вы имеете дело с контейнером JEE, или цепочка Менеджеры транзакций DataSource и JMS: https://www.javaworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html.

См. ChainedTransactionManager реализация в Spring Data:

/**
 * {@link PlatformTransactionManager} implementation that orchestrates transaction creation, commits and rollbacks to a
 * list of delegates. Using this implementation assumes that errors causing a transaction rollback will usually happen
 * before the transaction completion or during the commit of the most inner {@link PlatformTransactionManager}.
 * <p />
 * The configured instances will start transactions in the order given and commit/rollback in <em>reverse</em> order,
 * which means the {@link PlatformTransactionManager} most likely to break the transaction should be the <em>last</em>
 * in the list configured. A {@link PlatformTransactionManager} throwing an exception during commit will automatically
 * cause the remaining transaction managers to roll back instead of committing.
 *
 * @author Michael Hunger
 * @author Oliver Gierke
 * @since 1.6
 */
public class ChainedTransactionManager implements PlatformTransactionManager {

Сейчас похоже, что каждое отдельное сообщение отправляется в собственной локальной транзакции JMS. И это полностью не знает, что у вас есть внешняя транзакция JDBC.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...