Как реализовать очередь сообщений с помощью Spring Integration и MongoDB? - PullRequest
0 голосов
/ 25 апреля 2018

Как настроить Spring Integration, чтобы обработанные сообщения удалялись из коллекции.В консоли MongoDB я могу просто позвонить:

db.messages.findAndModify({ remove:true })

, но в MongoDbMessageSource только читает сообщения

mongoTemplate.find(..)

Я полагаю, это можно сделать с помощью некоторого удаления в транзакции.Но я не смог найти простое хорошее решение.

Входящая часть моей конфигурации:

@Bean
@Autowired
public IntegrationFlow pollMessages(MongoDbFactory mongoDbFactory, SomeService someService) {
    return IntegrationFlows.from(
            mongoMessageSource(mongoDbFactory),
            c -> c.poller(Pollers.fixedDelay(1, TimeUnit.SECONDS)))
            .handle(someService, "process")
            .get();
}

@Bean
@Autowired
public MongoDbMessageSource mongoMessageSource(MongoDbFactory mongo) {
    MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
    messageSource.setEntityClass(MessageEntity.class);
    messageSource.setCollectionNameExpression(new LiteralExpression("messages"));

    return messageSource;
}

1 Ответ

0 голосов
/ 25 апреля 2018

Это верно.Чтобы выполнить такое требование, вам нужно взглянуть на:

/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(
        TransactionSynchronizationFactory transactionSynchronizationFactory) {

И действительно выполнить удаление из коллекции в TransactionSynchronizationProcessor.processAfterCommit().

См. Справочное руководство для получения дополнительной информации.

Для конфигурации XML у нас есть этот тестовый пример:

<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
                                     channel="replyChannel"
                                     query="{'name' : 'Bob'}"
                                     auto-startup="false">

    <int:poller fixed-delay="200" max-messages-per-poll="1">
        <int:advice-chain  synchronization-factory="syncFactory">
            <bean
                    class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
            <tx:advice>
                <tx:attributes>
                    <tx:method name="*" />
                </tx:attributes>
            </tx:advice>
        </int:advice-chain>
    </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:before-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />

Нечто подобное можно сделать и с Java DSL.

Вам нужноDefaultTransactionSynchronizationFactory и ExpressionEvaluatingTransactionSynchronizationProcessor для настройки по данному вопросу.Да, можно использовать тот же PseudoTransactionManager.

Хотя вы также можете рассмотреть возможность вызова remove/update вручную в конце потока.

...