Spring Integration: запускать опрос JPA только после обработки всех результатов последнего опроса - PullRequest
1 голос
/ 22 апреля 2020

У меня есть следующий поток, который я хотел бы реализовать с помощью Spring Integration Java DSL:

  1. Опросить таблицу в базе данных каждые 2 часа, которая возвращает идентификатор документов, которые необходимо обработать
  2. Для каждого идентификатора обработать документ через шлюз HTTP
  3. Сохранить ответ в базе данных

У меня есть рабочий Java код, который выполняет именно эти функции шаги. Дополнительное требование, с которым я борюсь, заключается в том, что опрос для следующего раунда документов не должен происходить, пока все документы с последнего опроса не будут обработаны и сохранены в базе данных.

Есть ли какой-либо шаблон в Spring Integration, что я мог бы использовать для этого дополнительного требования?

Вот упрощенный код - он станет более сложным, и я разделю обработку документов (HTTP исходящих и постоянных) на отдельные классы / потоки:

return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
                .entityClass(ProcessingMetadata.class)
                .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                        "where p.status = com.test.ProcessingStatus.PROCESSED")
                .maxResults(1)
                .expectSingleResult(true),
        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
        .handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
                .entityClass(DocumentHeader.class)
                .jpaQuery("from DocumentHeader d where d.modified > :modified")
                .parameterExpression("modified", "payload"))
        .handle(Http.outboundGateway(uri)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class))
        .handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
                        .entityClass(ProcessingMetadata.class)
                        .persistMode(PersistMode.PERSIST),
                e -> e.transactional(true))
        .get();

ОБНОВЛЕНИЕ

Следуя предложению Артема, я пытаюсь реализовать его, используя SimpleActiveIdleMessageSourceAdvice

class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {

    public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return false;
    }
}

Если я правильно понимаю, приведенный выше код остановит опрос , Теперь я понятия не имею, как прикрепить этот совет к Jpa.inboundAdapter ... Кажется, у него нет правильного метода (ни совет, ни Spe c обработчик). Я что-то упускаю здесь очевидное? Я пытался прикрепить Совет к Jpa.retrievingGateway , но он совсем не меняет поток.

1 Ответ

1 голос
/ 22 апреля 2020

Сегодня я ответил на аналогичный вопрос: Как выполнить опрос из сообщения очереди 1 за один раз после завершения нисходящего потока в Spring Integration .

У вас также может быть хитрость Уровень базы данных не позволяет видеть новые записи в таблице, в то время как другие заблокированы. Или у вас может быть UPDATE в конце потока, в то время как ваш SELECT не увидит соответствующие записи, пока они не будут обновлены соответственно.

Но в любом случае любой из тех подходов, которые я предлагаю для этого вопроса, должен быть применен и здесь.

Также вы действительно можете полагаться на SimpleActiveIdleMessageSourceAdvice, поскольку ваше решение уже основано на реализации MessageSource.

ОБНОВЛЕНИЕ

Для вашего варианта использования, вероятно, было бы лучше расширить этот SimpleActiveIdleMessageSourceAdvice и переопределить его beforeReceive(), чтобы проверить состояние, в котором вы можете читать больше данных или нет. idlePollPeriod и activePollPeriod могут иметь одно и то же значение: не похоже, что имеет смысл менять их между ними, поскольку вы переходите в состояние ожидания сразу после чтения следующего набора данных.

Для проверки состояния это может быть простой AtomicBoolean компонент, который вы должны изменить после обработки текущего набора документов. Это может быть что-то после агрегатора или что-то еще, что вы можете использовать в своем решении.

ОБНОВЛЕНИЕ 2

Чтобы использовать WaitUntilCompleted для вашего Jpa.inboundAdapter, вам следует иметь конфигурацию, подобную этой:

IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
            .entityClass(ProcessingMetadata.class)
            .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                    "where p.status = com.test.ProcessingStatus.PROCESSED")
            .maxResults(1)
            .expectSingleResult(true),
    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))

Обратите внимание на .advice(waitUntilCompleted()), который является частью конфигурации pller и указывает на ваш бин совета.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...