Наша система получает сообщения для извлечения данных из удаленного сервиса, а затем сохраняет их в базе данных.В настоящее время он открывает несколько соединений с базой данных, чтобы сохранить извлеченные данные для каждого запроса.Мы хотим преобразовать его в процесс с несколькими производителями (выборка данных из удаленного сервиса) и одним потребителем для сохранения данных в базе данных.При этом для сохранения данных в базе данных будет удерживаться самое большее одно соединение.
Мы используем подпружиненную загрузку с реактором.Мы хотим, чтобы издатель опубликовал все данные, извлеченные из удаленной службы, на которую мы можем подписаться, и поместил эти данные в пакет, скажем, 200 записей в базе данных.
Например, я планирую нам следоватькод для приема сообщений из очереди ActiveMQ:
public Publisher<Message<RestoreMessage>> restoreMessagesSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination(RestoreMessage.class.getSimpleName() + "Queue"))
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
В этом коде сообщение от ActiveMQ qeueu помещается в ReactivePublisher.Этот издатель был подписан.Таким образом, мы обрабатываем сообщения из очереди.
Аналогичным образом мы хотим, чтобы ответ всех удаленных API был передан издателю, который мы можем обработать в подписчике в одном месте.