Многопоточный канал Executor для ускорения потребительского процесса - PullRequest
0 голосов
/ 27 января 2020

У меня есть производитель сообщений, который выдает около 15 сообщений в секунду

Потребитель - это весенний проект интеграции, который использует очередь сообщений и много обрабатывает. В настоящее время он является однопоточным и не может соответствовать скорости, с которой производитель отправляет сообщения. следовательно, глубина очереди продолжает увеличиваться

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
                .aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {

                    boolean eonExists = g.getMessages().stream()
                            .anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
                    if (eonExists) {
                        boolean einExists = g.getMessages().stream()
                                .anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
                        if (einExists) {
                            return true;
                        }
                    }
                    return false;
                }).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();

. Можно ли использовать канал исполнителя для обработки этого в многопоточной среде и ускорить процесс потребителя

Если да, пожалуйста, предложите, как я могу достижения - Для обеспечения порядка сообщений мне нужно назначить сообщения одного типа (на основе идентификатора сообщения) одной и той же ветке канала исполнителя.

[ОБНОВЛЕННЫЙ КОД] Я создал следующие каналы исполнителя

    public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
            .executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

    public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
            .executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

Теперь из основного потока сообщений я перенаправил на специальный маршрутизатор, который перенаправляет сообщение в канал исполнителя, как показано ниже -

    @Bean
    public IntegrationFlow baseEventFlow1() {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").route(route()).get();
    }

    public AbstractMessageRouter router() {
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                if (message.getPayload().toString().contains("\"id\":\"RPA")) {
                    return Collections.singletonList(RPA_DEFAULT_CHANNEL);
                } else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
                    return Collections.singletonList(SKW_DEFAULT_CHANNEL);
                } else {
                    return Collections.singletonList(new NullChannel());
                }
            }

        };
    }

I будет иметь индивидуальный поток потребителей для соответствующего канала исполнителя.

Пожалуйста, исправьте мое понимание

[ОБНОВЛЕНО]

    @Bean
    @BridgeTo("uaxDefaultChannel")
    public MessageChannel ucaDefaultChannel() {
        return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }

    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel ualDefaultChannel() {
        return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    @Bean
    public IntegrationFlow uaEventFlow() {
        return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
                .transform(eventHandler, "parseEvent")
}

Итак, BridgeTo на исполнителя канал будет пересылать сообщения

1 Ответ

1 голос
/ 27 января 2020

следовательно, глубина очереди продолжает увеличиваться

Так как похоже, что ваша очередь находится где-то в JMS-брокере, и это действительно нормально для такого поведения. Это именно то, для чего были разработаны системы обмена сообщениями - чтобы различать guish производителя и потребителя и иметь дело с сообщениями в месте назначения, когда это возможно.

если вы хотите увеличить опрос от JMS, вы можно рассмотреть возможность иметь опцию concurrency в контейнере JMS:

/**
 * The concurrency to use.
 * @param concurrency the concurrency.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrency(String)
 */
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
    this.target.setConcurrency(concurrency);
    return this;
}

/**
 * The concurrent consumers number to use.
 * @param concurrentConsumers the concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
    this.target.setConcurrentConsumers(concurrentConsumers);
    return this;
}

/**
 * The max for concurrent consumers number to use.
 * @param maxConcurrentConsumers the max concurrent consumers count.
 * @return current {@link JmsDefaultListenerContainerSpec}.
 * @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
 */
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
    this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
    return this;
}

См. дополнительную информацию в Документах: https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/integration.html#jms -прием

Но это победило ' t позволяет вам "назначать сообщения указанной ветке c". В JMS просто нет способа разделения.

Мы можем сделать это с помощью Spring Integration, используя router в соответствии с вашими «основанными на id сообщения» и конкретными экземплярами ExecutorChannel, настроенными с помощью одного с резьбой Executor. Каждый ExecutorChannel будет его отдельным исполнителем с одним потоком. Таким образом вы обеспечите порядок сообщений с одним и тем же ключом раздела и будете обрабатывать их параллельно. Все ExecutorChannel могут иметь одного и того же абонента или bridge для одного и того же канала для обработки.

Однако вы должны иметь в виду, что когда вы покидаете поток слушателя JMS, вы завершаете sh транзакцию JMS и не можете обработать сообщение в этом отдельном потоке, вы можете потерять сообщение.

...