Spring Integration завершает асинхронное преобразование перед следующим - PullRequest
1 голос
/ 04 февраля 2020

У меня есть поток интеграции, который регулярно опрашивает базу данных, чтобы извлечь любые MachineLine сущности, которые еще не были обработаны, и обработать их. Поток извлекает коллекцию MachineLine объектов, которые я хотел бы затем разделить на отдельные объекты, преобразовать эти объекты в ReportDetails объекты и сохранить преобразованные объекты в другую таблицу в базе данных. Процесс определяется следующим образом:

@Bean
public IntegrationFlow processMachineLine() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManager)
                            .entityClass(MachineLine.class)
                            .jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
                    e -> e.poller(Pollers.fixedDelay(5000)))
            .split()
            .transform(MachineLine.class, this::transformMachineLineToReportDetails)
            .handle(Jpa.outboundAdapter(this.entityManager)
                            .entityClass(ReportDetails.class),
                    ConsumerEndpointSpec::transactional)
            .get();
}

Приведенное выше определение работает нормально, но медленно. Метод transformMachineLineToReportDetails отправляет HTTP-запрос другой службе, для ответа на который требуется несколько секунд. С текущим определением потока каждый MachineLine объект ожидает преобразования и сохранения предыдущего объекта, прежде чем сделать то же самое с ними.

Таким образом, идеальным решением было бы выполнить это преобразование и постоянство асинхронно. Возможным решением было бы вставить следующую строку между .split() и .transform(...):

.channel(new ExecutorChannel(Executors.newCachedThreadPool()))

Однако это позволяет входящему адаптеру JPA снова опрашивать базу данных, прежде чем результаты последнего опроса обрабатываются и сохраняются. Это означает, что любые MachineLine сущности, возвращенные предыдущим опросом базы данных, которые не были преобразованы и сохранены до следующего опроса, будут извлечены во второй раз и будут пытаться преобразоваться и сохраниться во второй раз. Это, очевидно, вызывает ненужные затраты ресурсов, а также приводит к ошибке, когда несколько объектов ReportDetails с одинаковым идентификатором пытаются сохранить в базе данных.

Есть ли способ, которым я могу асинхронно преобразовать объекты MachineLine, но сделать Убедитесь, что база данных не опрошена снова, пока результаты предыдущего опроса не завершили свой путь в потоке (т.е. все MachineLine объекты преобразованы и сохранены)?

1 Ответ

1 голос
/ 04 февраля 2020

Единственный способ увидеть это через пользовательский AbstractMessageSourceAdvice против какого-то флага AtomicBoolean (тоже может быть бобом) для проверки beforeReceive(). Поскольку вы используете Pollers.fixedDelay(5000), ваша политика опроса все еще однопоточная. Поэтому мы не можем позволить одному и тому же потоку выполнить опрос против JPA, если это не разрешено AbstractMessageSourceAdvice. Логический флаг должен быть true в начале, и вы измените его на false до упомянутого split(). Вы можете сделать это, используя publishSubscribeChannel() в качестве двух подписчиков. Или даже сделать это в реализации AbstractMessageSourceAdvice - вроде compareAndSet(true, false) в этой реализации beforeReceive().

Затем вы разделяете и сохраняете после преобразования, как вы упомянули, используя ExecutorChannel.

В конце вашего потока вам нужно разместить publishSubscribeChannel() с двумя подписчиками - 1. handle(Jpa.outboundAdapter(this.entityManager); 2. aggregate() дождаться завершения всех разделенных элементов. После этого aggregate() вы помещаете простое handle(m -> pollingFlagBean().set(true)).

Вот и все: ваш новый опрос произойдет только тогда, когда все элементы будут обработаны и объединены обратно в группу. Только после этого вы позволяете опрашивать снова go, используя это AtomicBoolean.

Вы также можете рассмотреть возможность объединения этого флага logi c с SimpleActiveIdleMessageSourceAdvice, чтобы изменить период опроса между активным и пассивным режимы, позволяющие избежать большого простоя, когда вы ждете агрегации.

Любое другое асинхронное c решение все равно не будет работать для вас, поскольку переключение на другой поток немедленно завершит процесс опроса, чтобы позволить ему снова вращаться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...