Я пытаюсь создать поток (1), в котором сообщение принимается от адаптера TCP, который может быть клиентом или сервером, и отправляет сообщение брокеру ActiveMQ.
Мой другой поток (2) выбирает сообщение из требуемой очереди и отправляет по назначению
TCP (клиент / сервер) == (1) ==> ActiveMQ Broker == (2) ==> Исходящий HTTP-адаптер
Я хочу убедиться, что в случае, если мое сообщение не будет доставлено в требуемый пункт назначения, оно попытается повторно отправить сообщение.
Мой текущий поток (1) к брокеру:
IntegrationFlow flow = IntegrationFlows
.from(Tcp
.inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
.serializer(customSerializer).deserializer(customSerializer)
.id("server").soTimeout(5000))
.id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
.wireTap("tcpInboundMessageLogChannel").channel(directChannel())
.handle(Jms.outboundAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.get();
this.flowContext.registration(flow).id("outflow").register();
и Мой поток (2) от брокера к исходящему http:
flow = IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
.destination("jmsInbound"))
.channel(directChannel())
.handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.mappedRequestHeaders("abc"))
.get();
this.flowContext.registration(flow).id("inflow").register();
Проблема:
В случае каких-либо исключений во время доставки, например, мой целевой URL не работает, тогда он пытается отправить сообщение.
После неудачной попытки он повторяется 7 раз, т.е. max attempt to 7
Если попытка все еще не удалась, она отправляет сообщение в ActiveMQ.DLQ
(очередь недоставленных сообщений) и не предпринимает повторных попыток, поскольку сообщение удаляется из фактической очереди и отправляется в ActiveMQ.DLQ
.
Итак, я хочу сценарий, что сообщение не будет потеряно и сообщение будет обработано по порядку.