Как мне продолжить агрегацию в одном потоке с опросом? - PullRequest
0 голосов
/ 24 апреля 2019

Я создаю поток, который опрашивает строки из базы данных по статусу, проверяет их и после этого объединяет в коллекцию. После обработки всего потока каждой строке присваивается соответствующий статус. Но когда я использую агрегатор со стратегией релиза TimeoutCountSequenceSizeReleaseStrategy, а истекшее время очень мало, группа релизов не создается. И после этого следующий опрос происходит в другом потоке, но предыдущая группа сообщений не обрабатывается до тех пор, пока количество сообщений не достигнет цели (порога) в стратегии.

Код моего потока:

@Bean
public IntegrationFlow testFlow(EntityService entityService,
                                EntityValidator entityValidator,
                                EntityFlowProperties properties,                                       
                                EntityChecker checker) {
    return IntegrationFlows
            .from(getMessageSource(entityService::getByStatus, properties.getMaxRowsPerPoll()),
                    e -> e.poller(getPollerSpec(properties)))
            .split()
            .transform(entityValidator::validate)
            .filter(ValidationStatus<Entity>::isValid, filter ->
                    filter.discardFlow(flow -> flow.handle(entityService::handleValidationErrors)))
            .transform(ValidationStatus<Entity>::getEntity)
            .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(5, 10000)))
            .transform(checker::checkOnSomething)
            .split()
            .transform(CheckResultAware<Entity>::getEntity)
            .handle(entityService::saveAndChangeStatus)
            .get();

Я ожидаю выполнить агрегацию в том же потоке, что и опрос, и не делать новый опрос, пока не закончится текущий поток.

Способ изменения статусов между опросом и агрегацией не подходит.

Есть ли способ сделать это?

1 Ответ

0 голосов
/ 24 апреля 2019

Зачем вам нужно TimeoutCountSequenceSizeReleaseStrategy;ваши последовательности конечны;просто используйте значение по умолчанию SimpleSequenceSizeReleaseStrategy.

Однако TimeoutCountSequenceSizeReleaseStrategy все равно должен выпускаться в зависимости от размера последовательности.

Но это не совсем подходит для вашего варианта использования, потому что вы можете оставить счастичная группа в магазине.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...