У меня есть производитель сообщений, который выдает около 15 сообщений в секунду
Потребитель - это весенний проект интеграции, который использует очередь сообщений и много обрабатывает. В настоящее время он является однопоточным и не может соответствовать скорости, с которой производитель отправляет сообщения. следовательно, глубина очереди продолжает увеличиваться
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();
. Можно ли использовать канал исполнителя для обработки этого в многопоточной среде и ускорить процесс потребителя
Если да, пожалуйста, предложите, как я могу достижения - Для обеспечения порядка сообщений мне нужно назначить сообщения одного типа (на основе идентификатора сообщения) одной и той же ветке канала исполнителя.
[ОБНОВЛЕННЫЙ КОД] Я создал следующие каналы исполнителя
public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
Теперь из основного потока сообщений я перенаправил на специальный маршрутизатор, который перенаправляет сообщение в канал исполнителя, как показано ниже -
@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
I будет иметь индивидуальный поток потребителей для соответствующего канала исполнителя.
Пожалуйста, исправьте мое понимание
[ОБНОВЛЕНО]
@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}
Итак, BridgeTo на исполнителя канал будет пересылать сообщения