Ответ получен после тайм-аута - PullRequest
0 голосов
/ 15 января 2019

Я хочу зарегистрировать слушателей двух очередей для Spring AMQP с последней версией Spring:

    @Bean
    public SimpleMessageListenerContainer processingTransactionSaleContainer(ConnectionFactory cf, TransactionElavonSaleListener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_SALE);
        container.setMessageListener(new MessageListenerAdapter(listener, "transactionSaleProcess"));
        container.setMessageConverter(new SerializerMessageConverter());
        return container;
    }

    @Bean
    public SimpleMessageListenerContainer processingTransactionAuthorizeContainer(ConnectionFactory cf, TransactionAuthorizeListener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_AUTHORIZE);
        container.setMessageListener(new MessageListenerAdapter(listener, "transactionAuthorizeProcess"));
        container.setMessageConverter(new SerializerMessageConverter());
        return container;
    }

Слушатель:

@Component
public class TransactionElavonSaleListener {

    public TransactionResponseFactory transactionElavonSaleProcess(TransactionRequestFactory ro) {
        ..... do some heavy network request
        return parseRawSuccessResponse(response);
    }

}

Когда я удаляю один из SimpleMessageListenerContainer, это рабочий файл, но когда я использую оба метода, я получаю следующее исключение:

00:40:14,469 INFO  [stdout] (pool-9-thread-5) 00:40:14.468 [pool-9-thread-5] WARN  o.s.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 1
00:40:14,472 INFO  [stdout] (pool-9-thread-5) 00:40:14.472 [pool-9-thread-5] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
00:40:14,473 INFO  [stdout] (pool-9-thread-5) org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
00:40:14,473 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
00:40:14,474 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
00:40:14,474 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
00:40:14,474 INFO  [stdout] (pool-9-thread-5)   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
00:40:14,474 INFO  [stdout] (pool-9-thread-5)   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
00:40:14,478 INFO  [stdout] (pool-9-thread-5)   at java.base/java.lang.Thread.run(Thread.java:844)
00:40:14,481 INFO  [stdout] (pool-9-thread-5) Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
00:40:14,494 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2446)
00:40:14,494 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115)
00:40:14,494 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
00:40:14,494 INFO  [stdout] (pool-9-thread-5)   ... 11 common frames omitted
00:40:14,495 INFO  [stdout] (pool-9-thread-5) 00:40:14.495 [pool-9-thread-5] ERROR o.s.a.r.l.DirectReplyToMessageListenerContainer - Failed to invoke listener
00:40:14,495 INFO  [stdout] (pool-9-thread-5) org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
00:40:14,495 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
00:40:14,495 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
00:40:14,496 INFO  [stdout] (pool-9-thread-5)   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
00:40:14,497 INFO  [stdout] (pool-9-thread-5)   at java.base/java.lang.Thread.run(Thread.java:844)
00:40:14,497 INFO  [stdout] (pool-9-thread-5) Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
00:40:14,497 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2446)
00:40:14,497 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115)
00:40:14,497 INFO  [stdout] (pool-9-thread-5)   at deployment.rest_api.war//org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
00:40:14,497 INFO  [stdout] (pool-9-thread-5)   ... 11 common frames omitted

Знаете ли вы, почему я получаю это исключение, когда у меня есть эти два метода?

EDITL Определенные слушатели:

    @Bean
    public SimpleMessageListenerContainer processingTransactionElavonSaleContainer(ConnectionFactory cf, TransactionElavonSaleListener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_ELAVON_SALE);
        container.setMessageListener(new MessageListenerAdapter(listener, "transactionElavonSaleProcess"));
        return container;
    }

    @Bean
    public SimpleMessageListenerContainer processingTransactionElavonAuthorizeContainer(ConnectionFactory cf, TransactionElavonAuthorizeListener listener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames(QUEUE_PROCESSING_ELAVON_AUTHORIZE);
        container.setMessageListener(new MessageListenerAdapter(listener, "transactionElavonAuthorizeProcess"));
        return container;
    }

...

@Component
public class TransactionElavonSaleListener {

    public TransactionResponseFactory transactionElavonSaleProcess(TransactionRequestFactory ro) {

        return new TransactionResponseFactory();
    }
}


@Component
public class TransactionElavonAuthorizeListener {

    public TransactionResponseFactory transactionElavonAuthorizeProcess(TransactionRequestFactory tf) {

        TransactionResponseFactory obj = new TransactionResponseFactory();                  
        return obj;
    }
}

Отправить объект:

TransactionResponseFactory processingPeply = (TransactionResponseFactory) processingTransactionElavonAuthorizeTemplate.convertSendAndReceive(
        ContextServer.EXCHANGE_PROCESSING, ContextServer.ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON, tf);
    System.out.println("!!!!! Received PROCESSING_TRANSACTION " + processingPeply.getTransaction_id());

1 Ответ

0 голосов
/ 15 января 2019

Наблюдение за вашей конфигурацией (однако, не относится к вашей проблеме) ...

  • Код declare... в вашем AmqpAdmin не требуется.
    • вы не должны никогда не взаимодействовать с брокером во время фазы определения компонента - это слишком рано.
    • они не нужны, поскольку администратор найдет компоненты Queue и Exchange и автоматически объявит их для вас при первом открытии соединения.

Да, этот установщик конвертера сообщений не используется для этого типа слушателя; конвертер сообщений должен быть установлен на MessageListenerAdapter.

Однако по умолчанию он получает SimpleMessageConverter, поэтому проблем не должно быть; этот конвертер обрабатывает сериализованные объекты, а также простой текст.

Теперь к вашей актуальной проблеме; добавление второго контейнера не должно иметь никакого влияния на стороне клиента; каждый шаблон получает свой собственный контейнер ответов, и по умолчанию используется прямой ответ, поэтому между ними не будет перекрестных разговоров (что может произойти, если вы используете именованную очередь ответов, но здесь это не так).

Я предлагаю вам включить ведение журнала DEBUG, чтобы выяснить, что происходит; если вам нужна помощь в их анализе; опубликуйте логи (как со стороны клиента, так и со стороны сервера), и я посмотрю.

EDIT

Ваши привязки неверны:

@Bean
public Binding bindingQueueProcessingElavonSale() {
    return BindingBuilder.bind(new Queue(QUEUE_PROCESSING_ELAVON_SALE, true))
            .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON);
}

@Bean
public Binding bindingQueueProcessingElavonAuthorize() {
    return BindingBuilder.bind(new Queue(QUEUE_PROCESSING_ELAVON_AUTHORIZE, true))
            .to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION_ELAVON);
}

Вы связываете обе очереди с одним и тем же обменом с одним и тем же ключом маршрутизации - RabbitMQ будет отправлять сообщения с этим ключом маршрутизации в обе очереди, поэтому оба слушателя будут отвечать.

...