Погрузить в http ответы удаленной службы издателю - PullRequest
0 голосов
/ 02 июня 2019

Наша система получает сообщения для извлечения данных из удаленного сервиса, а затем сохраняет их в базе данных.В настоящее время он открывает несколько соединений с базой данных, чтобы сохранить извлеченные данные для каждого запроса.Мы хотим преобразовать его в процесс с несколькими производителями (выборка данных из удаленного сервиса) и одним потребителем для сохранения данных в базе данных.При этом для сохранения данных в базе данных будет удерживаться самое большее одно соединение.

Мы используем подпружиненную загрузку с реактором.Мы хотим, чтобы издатель опубликовал все данные, извлеченные из удаленной службы, на которую мы можем подписаться, и поместил эти данные в пакет, скажем, 200 записей в базе данных.

Например, я планирую нам следоватькод для приема сообщений из очереди ActiveMQ:

    public Publisher<Message<RestoreMessage>> restoreMessagesSource() {
        return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                .destination(RestoreMessage.class.getSimpleName() + "Queue"))
            .channel(MessageChannels.queue())
            .log(LoggingHandler.Level.DEBUG)
            .log()
            .toReactivePublisher();
    }

В этом коде сообщение от ActiveMQ qeueu помещается в ReactivePublisher.Этот издатель был подписан.Таким образом, мы обрабатываем сообщения из очереди.

Аналогичным образом мы хотим, чтобы ответ всех удаленных API был передан издателю, который мы можем обработать в подписчике в одном месте.

1 Ответ

0 голосов
/ 03 июня 2019

Похоже, у вас будет несколько Publisher<Message<?>>, и вы хотите использовать их все в одном подписчике.По этой причине вы можете использовать:

/**
 * Merge data from {@link Publisher} sequences contained in an array / vararg
 * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat},
 * sources are subscribed to eagerly.
 * <p>
 * <img class="marble" src="doc-files/marbles/mergeFixedSources.svg" alt="">
 * <p>
 * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
 * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
 * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
 * another source.
 *
 * @param sources the array of {@link Publisher} sources to merge
 * @param <I> The source type of the data sequence
 *
 * @return a merged {@link Flux}
 */
@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources) {

Итак, вы собираетесь свести все свои источники к одному Flux и подпишитесь на этот.

Обратите внимание на Note..toReactivePublisher() действительно производит бесконечный источник , хотя, согласно Jms.messageDrivenChannelAdapter(), это делается в его конкретном потоке от исполнителя в контейнере слушателя.Итак, попробуйте как есть или оберните каждый источник в Flux с определенным publishOn().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...