У меня есть поток интеграции, который регулярно опрашивает базу данных, чтобы извлечь любые MachineLine
сущности, которые еще не были обработаны, и обработать их. Поток извлекает коллекцию MachineLine
объектов, которые я хотел бы затем разделить на отдельные объекты, преобразовать эти объекты в ReportDetails
объекты и сохранить преобразованные объекты в другую таблицу в базе данных. Процесс определяется следующим образом:
@Bean
public IntegrationFlow processMachineLine() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManager)
.entityClass(MachineLine.class)
.jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
e -> e.poller(Pollers.fixedDelay(5000)))
.split()
.transform(MachineLine.class, this::transformMachineLineToReportDetails)
.handle(Jpa.outboundAdapter(this.entityManager)
.entityClass(ReportDetails.class),
ConsumerEndpointSpec::transactional)
.get();
}
Приведенное выше определение работает нормально, но медленно. Метод transformMachineLineToReportDetails
отправляет HTTP-запрос другой службе, для ответа на который требуется несколько секунд. С текущим определением потока каждый MachineLine
объект ожидает преобразования и сохранения предыдущего объекта, прежде чем сделать то же самое с ними.
Таким образом, идеальным решением было бы выполнить это преобразование и постоянство асинхронно. Возможным решением было бы вставить следующую строку между .split()
и .transform(...)
:
.channel(new ExecutorChannel(Executors.newCachedThreadPool()))
Однако это позволяет входящему адаптеру JPA снова опрашивать базу данных, прежде чем результаты последнего опроса обрабатываются и сохраняются. Это означает, что любые MachineLine
сущности, возвращенные предыдущим опросом базы данных, которые не были преобразованы и сохранены до следующего опроса, будут извлечены во второй раз и будут пытаться преобразоваться и сохраниться во второй раз. Это, очевидно, вызывает ненужные затраты ресурсов, а также приводит к ошибке, когда несколько объектов ReportDetails
с одинаковым идентификатором пытаются сохранить в базе данных.
Есть ли способ, которым я могу асинхронно преобразовать объекты MachineLine
, но сделать Убедитесь, что база данных не опрошена снова, пока результаты предыдущего опроса не завершили свой путь в потоке (т.е. все MachineLine
объекты преобразованы и сохранены)?