Spring Integration JMS обеспечивает доставку сообщений с использованием DSL - PullRequest
0 голосов
/ 26 июня 2018

Я пытаюсь создать поток (1), в котором сообщение принимается от адаптера TCP, который может быть клиентом или сервером, и отправляет сообщение брокеру ActiveMQ.

Мой другой поток (2) выбирает сообщение из требуемой очереди и отправляет по назначению

TCP (клиент / сервер) == (1) ==> ActiveMQ Broker == (2) ==> Исходящий HTTP-адаптер

Я хочу убедиться, что в случае, если мое сообщение не будет доставлено в требуемый пункт назначения, оно попытается повторно отправить сообщение.

Мой текущий поток (1) к брокеру:

IntegrationFlow flow = IntegrationFlows
            .from(Tcp
                    .inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
                            .serializer(customSerializer).deserializer(customSerializer)
                            .id("server").soTimeout(5000))
                    .id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
            .wireTap("tcpInboundMessageLogChannel").channel(directChannel())
            .handle(Jms.outboundAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .get();

    this.flowContext.registration(flow).id("outflow").register();

и Мой поток (2) от брокера к исходящему http:

flow = IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .channel(directChannel())
            .handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                    .mappedRequestHeaders("abc"))
            .get();
    this.flowContext.registration(flow).id("inflow").register();

Проблема:

  • В случае каких-либо исключений во время доставки, например, мой целевой URL не работает, тогда он пытается отправить сообщение.

  • После неудачной попытки он повторяется 7 раз, т.е. max attempt to 7

  • Если попытка все еще не удалась, она отправляет сообщение в ActiveMQ.DLQ (очередь недоставленных сообщений) и не предпринимает повторных попыток, поскольку сообщение удаляется из фактической очереди и отправляется в ActiveMQ.DLQ.

Итак, я хочу сценарий, что сообщение не будет потеряно и сообщение будет обработано по порядку.

1 Ответ

0 голосов
/ 26 июня 2018

Во-первых: я считаю, что вы можете настроить jmsInbound для бесконечных повторных попыток:

/**
 * Configuration options for a messageConsumer used to control how messages are re-delivered when they
 * are rolled back.
 * May be used server side on a per destination basis via the Broker RedeliveryPlugin
 *
 * @org.apache.xbean.XBean element="redeliveryPolicy"
 *
 */
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {

С другой стороны, вы можете настроить .handle(Http.outboundChannelAdapter( для RequestHandlerRetryAdvice для аналогичного поведения повтора, но внутри приложения без циклических переходов в JMS и обратно: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice

Вот пример того, как его можно настроить с точки зрения Java DSL:

    @Bean
    public IntegrationFlow errorRecovererFlow() {
        return IntegrationFlows.from(Function.class, "errorRecovererFunction")
                .handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))
                .get();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public MessageChannel recoveryChannel() {
        return new DirectChannel();
    }

RequestHandlerRetryAdvice можно настроить с помощью RetryTemplate для применения чего-то вроде AlwaysRetryPolicy. См. Проект Spring Retry для получения дополнительной информации: https://github.com/spring-projects/spring-retry

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...