Как бороться с потребителем сообщений jms с помощью DUPS_OK_ACKNOWLEDGE в исключительной ситуации - PullRequest
0 голосов
/ 18 мая 2018

Я использую сообщения из очереди Tibco, с session transacted true я вижу пропускную способность 13 мсг / с, с режимами session transacted as false и DUPS_OK_ACKNOWLEDGE ack - пропускная способность 160 мсг / с, что многообещающе, но когдав приложении возникает исключение, после чего сообщение теряется.

Может ли кто-нибудь подсказать мне, как справиться с такой ситуацией, когда мне нужна высокая пропускная способность при той же потере сообщения при исключении.

    .from(Jms.messageDrivenChannelAdapter(tibcoConnectionFactory)
            .destination(sourceQueue)
            .configureListenerContainer(spec -> {
                spec.sessionTransacted(false);
                spec.sessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
            }))
            .transform(orderTransformer, "transform", e -> e.advice(idempotentReceiverInterceptor())
            .handle(orderService, "save")
            .get();

1 Ответ

0 голосов
/ 18 мая 2018

.from(Jms.messageDrivenChannelAdapter(tibcoConnectionFactory)

будет использовать контейнер по умолчанию, который может потерять сообщения, если не будет обработан, потому что слушатель вызывается за пределами consumer.receive().

Использование

.from(Jms.messageDrivenChannelAdapter(tibcoConnectionFactory, 
    SimpleMessageListenerContainer.class)

вместо.

РЕДАКТИРОВАТЬ

Сообщение об ошибке должно было быть подсказкой ...

2018-05-18 11: 38: 43.657 ПРЕДУПРЕЖДЕНИЕ 48531 --- [Session Task-1] osjlSimpleMessageListenerContainer: сбой выполнения прослушивателя сообщений JMS и не был установлен ErrorHandler.

Поэтому добавьте ошибкуобработчик ...

@SpringBootApplication
public class So50413144Application {

    public static void main(String[] args) {
        SpringApplication.run(So50413144Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            for (int i = 0; i < 10; i++) {
                template.convertAndSend("foo", "foo" + i);
            }
        };
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory, SimpleMessageListenerContainer.class)
                        .destination("foo")
                        .configureListenerContainer(c -> {
                            c.sessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
                            c.errorHandler(t -> {
                                if (t instanceof RuntimeException) {
                                    throw (RuntimeException) t;
                                }
                            });
                        }))
                .handle((p, h) -> {
                    System.out.println(p);
                    if (p.equals("foo5")) {
                        throw new RuntimeException("fail");
                    }
                    try {
                        Thread.sleep(200);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }

}

... и foo5 доставляется снова и снова.

...