У меня есть 10 очередей rabbitMQ, которые называются event.q.0, event.q.2, <...>, event.q.9.Каждая из этих очередей получает сообщения, перенаправленные из обмена event.consistent-hash.Я хочу создать отказоустойчивое решение, которое будет последовательно использовать сообщения для определенного события, так как порядок важен.Для этого я настроил поток, который прослушивает эти очереди и направляет сообщения на основе идентификатора события в конкретный рабочий поток.Рабочие потоки работают на основе каналов очереди, поэтому должны гарантировать порядок FIFO для события с определенным идентификатором.Я придумал следующую настройку:
@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
return IntegrationFlows
.from(
Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
.configureContainer(c -> c
.adviceChain(retryAdvice())
.addQueueNames(queueNames)
.prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
)
.messageConverter(rabbitTemplate.getMessageConverter())
)
.<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
.get();
}
private Advice deadLetterAdvice() {
return RetryInterceptorBuilder
.stateless()
.maxAttempts(3)
.recoverer(recoverer())
.backOffPolicy(backOffPolicy())
.build();
}
private ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(3.0);
backOffPolicy.setMaxInterval(15000);
return backOffPolicy;
}
private MessageRecoverer recoverer() {
return new RepublishMessageRecoverer(
rabbitTemplate,
"error.exchange.dlx"
);
}
@PostConstruct
public void init() {
for (int i = 0; i < numberOfWorkers; i++) {
flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
.autoStartup(false)
.id(String.format("worker-flow-%d", i))
.register();
}
}
private IntegrationFlow workerFlow(QueueChannel channel) {
return IntegrationFlows
.from(channel)
.<Object, Class<?>>route(Object::getClass, m -> m
.resolutionRequired(true)
.defaultOutputToParentFlow()
.subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
.subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
)
.get();
}
Теперь, когда, скажем, ошибка возникает в eventConsumerFlow
, механизм повтора работает, как и ожидалось, но когда ошибка возникает в workerFlow
,повторная попытка больше не работает, и сообщение не отправляется на обмен мертвыми письмами.Я предполагаю, что это потому, что как только сообщение передается в QueueChannel, оно автоматически подтверждается.Как сделать так, чтобы механизм повторных попыток работал в workerFlow
, чтобы в случае возникновения исключения он мог повторить пару раз и отправить сообщение DLX, когда попытки исчерпаны?