Как обрабатывать ошибки после того, как сообщение было передано в QueueChannel? - PullRequest
0 голосов
/ 30 января 2019

У меня есть 10 очередей rabbitMQ, которые называются event.q.0, event.q.2, <...>, event.q.9.Каждая из этих очередей получает сообщения, перенаправленные из обмена event.consistent-hash.Я хочу создать отказоустойчивое решение, которое будет последовательно использовать сообщения для определенного события, так как порядок важен.Для этого я настроил поток, который прослушивает эти очереди и направляет сообщения на основе идентификатора события в конкретный рабочий поток.Рабочие потоки работают на основе каналов очереди, поэтому должны гарантировать порядок FIFO для события с определенным идентификатором.Я придумал следующую настройку:

@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
    return IntegrationFlows
            .from(
                    Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
                            .configureContainer(c -> c
                                    .adviceChain(retryAdvice())
                                    .addQueueNames(queueNames)
                                    .prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
                            )
                            .messageConverter(rabbitTemplate.getMessageConverter())
            )
            .<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
            .get();
}

private Advice deadLetterAdvice() {
    return RetryInterceptorBuilder
            .stateless()
            .maxAttempts(3)
            .recoverer(recoverer())
            .backOffPolicy(backOffPolicy())
            .build();
}

private ExponentialBackOffPolicy backOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(3.0);
    backOffPolicy.setMaxInterval(15000);

    return backOffPolicy;
}

private MessageRecoverer recoverer() {
    return new RepublishMessageRecoverer(
            rabbitTemplate,
            "error.exchange.dlx"
    );
}

@PostConstruct
public void init() {
    for (int i = 0; i < numberOfWorkers; i++) {
        flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
                .autoStartup(false)
                .id(String.format("worker-flow-%d", i))
                .register();
    }
}

private IntegrationFlow workerFlow(QueueChannel channel) {
    return IntegrationFlows
        .from(channel)
        .<Object, Class<?>>route(Object::getClass, m -> m
                .resolutionRequired(true)
                .defaultOutputToParentFlow()
                .subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
                .subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
        )
        .get();
}

Теперь, когда, скажем, ошибка возникает в eventConsumerFlow, механизм повтора работает, как и ожидалось, но когда ошибка возникает в workerFlow,повторная попытка больше не работает, и сообщение не отправляется на обмен мертвыми письмами.Я предполагаю, что это потому, что как только сообщение передается в QueueChannel, оно автоматически подтверждается.Как сделать так, чтобы механизм повторных попыток работал в workerFlow, чтобы в случае возникновения исключения он мог повторить пару раз и отправить сообщение DLX, когда попытки исчерпаны?

1 Ответ

0 голосов
/ 30 января 2019

Если вы хотите устойчивости, вам вообще не следует использовать каналы очереди;сообщения будут подтверждены сразу после помещения сообщения в очередь в памяти, в случае сбоя сервера эти сообщения будут потеряны.

Необходимо настроить отдельный адаптер для каждой очереди, если не требуется потеря сообщений.

При этом для ответа на общий вопрос любые ошибки в нисходящих потоках (в том числе после канала очереди) будут отправлены на errorChannel, определенный на входящем адаптере.

...