У меня есть следующий поток, который я хотел бы реализовать с помощью Spring Integration Java DSL:
- Опросить таблицу в базе данных каждые 2 часа, которая возвращает идентификатор документов, которые необходимо обработать
- Для каждого идентификатора обработать документ через шлюз HTTP
- Сохранить ответ в базе данных
У меня есть рабочий Java код, который выполняет именно эти функции шаги. Дополнительное требование, с которым я борюсь, заключается в том, что опрос для следующего раунда документов не должен происходить, пока все документы с последнего опроса не будут обработаны и сохранены в базе данных.
Есть ли какой-либо шаблон в Spring Integration, что я мог бы использовать для этого дополнительного требования?
Вот упрощенный код - он станет более сложным, и я разделю обработку документов (HTTP исходящих и постоянных) на отдельные классы / потоки:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
ОБНОВЛЕНИЕ
Следуя предложению Артема, я пытаюсь реализовать его, используя SimpleActiveIdleMessageSourceAdvice
class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
Если я правильно понимаю, приведенный выше код остановит опрос , Теперь я понятия не имею, как прикрепить этот совет к Jpa.inboundAdapter ... Кажется, у него нет правильного метода (ни совет, ни Spe c обработчик). Я что-то упускаю здесь очевидное? Я пытался прикрепить Совет к Jpa.retrievingGateway , но он совсем не меняет поток.