Я создаю поток, который опрашивает строки из базы данных по статусу, проверяет их и после этого объединяет в коллекцию. После обработки всего потока каждой строке присваивается соответствующий статус. Но когда я использую агрегатор со стратегией релиза TimeoutCountSequenceSizeReleaseStrategy
, а истекшее время очень мало, группа релизов не создается. И после этого следующий опрос происходит в другом потоке, но предыдущая группа сообщений не обрабатывается до тех пор, пока количество сообщений не достигнет цели (порога) в стратегии.
Код моего потока:
@Bean
public IntegrationFlow testFlow(EntityService entityService,
EntityValidator entityValidator,
EntityFlowProperties properties,
EntityChecker checker) {
return IntegrationFlows
.from(getMessageSource(entityService::getByStatus, properties.getMaxRowsPerPoll()),
e -> e.poller(getPollerSpec(properties)))
.split()
.transform(entityValidator::validate)
.filter(ValidationStatus<Entity>::isValid, filter ->
filter.discardFlow(flow -> flow.handle(entityService::handleValidationErrors)))
.transform(ValidationStatus<Entity>::getEntity)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(5, 10000)))
.transform(checker::checkOnSomething)
.split()
.transform(CheckResultAware<Entity>::getEntity)
.handle(entityService::saveAndChangeStatus)
.get();
Я ожидаю выполнить агрегацию в том же потоке, что и опрос, и не делать новый опрос, пока не закончится текущий поток.
Способ изменения статусов между опросом и агрегацией не подходит.
Есть ли способ сделать это?