Spring Integration Java DSL с JMS-слушателем с исполнителем задач и автоматическим выключателем - PullRequest
0 голосов
/ 10 марта 2020

Я относительно новичок в Spring Integration Java DSL. Я хотел бы реализовать довольно простой сценарий, который в настоящее время выполняется приложением Spring Boot

  1. Получение сообщения из очереди JMS (то есть ActiveMQ). Я хотел бы использовать taskExecutor для применения настраиваемого размера пула для многопоточного подхода
  2. для преобразования полезной нагрузки в формат xml. в случае ошибки на преобразователе я хотел бы остановить цепочку (не нужно переходить к шагу 3)
  3. вызвать другой канал, чтобы отправить полезную нагрузку xml в мое основное приложение через rest. В этой части я хотел бы применить автоматический выключатель в случае любой ошибки при вызове моего основного приложения через rest

, в настоящее время моя логика c довольно проста

 @Bean
public IntegrationFlow jmsMessageDrivenFlowWithContainer() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(
                    Jms.container(this.jmsConnectionFactory, recordDestinationQueue)
                            .concurrentConsumers(concurrentConsumers)
                            .maxConcurrentConsumers(maxConcurrentConsumers))
                    .errorChannel("errorChannel"))
            .transform(msgTransformer, "transform")
            .channel(this.handleChannel())
            .get();
}


@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows.from("errorChannel")
            .handle(m -> {

                MessagingException me = (MessagingException) m.getPayload();
                LOGGER.debug("Message: " + me.getFailedMessage() + "\nFailed with "
                        + me.getCause().getMessage());
            })
            .get();
}

Может кто угодно помогите мне, какой лучший подход, чтобы применить мой случай. спасибо

1 Ответ

0 голосов
/ 10 марта 2020

хотел бы использовать taskExecutor для применения настраиваемого размера пула для многопоточного подхода

Это неправильный подход; вы рискуете потерять сообщения при передаче сообщений в другой поток.

Чтобы увеличить параллелизм, вам нужно вместо этого увеличить параллелизм в контейнере слушателя.

Вы также смешиваете конфигурацию DSL и Java.

Используйте взамен

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(...))
    .transform(...)
    ...

.

РЕДАКТИРОВАТЬ

Не используйте .get() для контейнера spe c там он не будет правильно инициализирован. Каркас должен создать bean-компонент.

Кроме того, вам все равно нужно добавить параллелизм в контейнер.

Добавить errorChannel к адаптеру для обработки ошибок.

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination)
                    .concurrentConsumers(5))
                .errorChannel(someErrorChannel))
            .transform(xsltTransformer, "doTransform")
            ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...