Spring Integration JmsOutboundGateway Запрос - поток ответов не работает - PullRequest
0 голосов
/ 07 января 2020

Я пытаюсь реализовать поток запросов / ответов JmsOutboundGateway. Мой поток отправляет сообщение через JmsOutboundGateway в ActiveMQ. Это отлично работает. После этого я слушаю другое имя очереди, которое должно быть ответом для шлюза. Однако это не работает. Я не получаю ответ правильно.

@Configuration
public class VinnaOutboundConfig {

   @Value("${file.output.backup-dir}")
   private String backupDir;

   @Value("${jms.entry}")
   private String jmsFileInboxDestination;

   @Value("${jms.vinna}")
   private String jmsVinnaDestination;

   @Value("${jms.vinna-response}")
   private String jmsVinnaResponseDestination;

   private final MessageHandler writeBackupFileMessageHandler;
   private final ConnectionFactory connectionFactory;

   @Autowired
   public VinnaOutboundConfig(MessageHandler writeBackupFileMessageHandler,
         ConnectionFactory connectionFactory) {
      this.writeBackupFileMessageHandler = writeBackupFileMessageHandler;
      this.connectionFactory = connectionFactory;
   }

   @Bean
   public MessageChannel vinnaRequestChannel() { return new DirectChannel(); }

   @Bean
   public PollableChannel vinnaReplyChannel() { return new QueueChannel(); }

   @Bean
   public MessageChannel vinnaErrorChannel() { return new DirectChannel(); }

   @Bean
   public IntegrationFlow vinnaOutboundFlow() throws ParserConfigurationException {
      return IntegrationFlows
            // receive message from queue
            .from(jmsInboundGateway())
            .transform(new ResultToStringTransformer())
            .handle(jmsVinnaOutboundGateway())
            .get();
   }

   @Bean
   public JmsOutboundGateway jmsVinnaOutboundGateway() {
      JmsOutboundGateway gateway = new JmsOutboundGateway();
      gateway.setConnectionFactory(connectionFactory);
      gateway.setCorrelationKey("JMSCorrelationID");
      gateway.setRequestDestinationName(jmsVinnaDestination);
      gateway.setAsync(true);
      gateway.setUseReplyContainer(true);
      gateway.setReplyContainerProperties(replyContainerProperties());
      gateway.setReplyChannel(vinnaReplyChannel());
      gateway.setReplyDestinationName(jmsVinnaResponseDestination);
      gateway.setReceiveTimeout(10000L);
      return gateway;
   }

   @Bean
   public JmsOutboundGateway.ReplyContainerProperties replyContainerProperties() {
      JmsOutboundGateway.ReplyContainerProperties properties = new JmsOutboundGateway.ReplyContainerProperties();
      properties.setTaskExecutor(new SimpleAsyncTaskExecutor());
      properties.setConcurrentConsumers(1);
      return properties;
   }

   @Bean
   public JmsInboundGateway jmsInboundGateway() {
      JmsInboundGateway gateway = new JmsInboundGateway(simpleMessageListenerContainer(), channelPublishingJmsMessageListener());
      gateway.setRequestChannel(vinnaRequestChannel());
      gateway.setErrorChannel(vinnaErrorChannel());
      return gateway;
   }

   @Bean
   public SimpleMessageListenerContainer simpleMessageListenerContainer() {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      container.setDestinationName(jmsFileInboxDestination);
      container.setTaskExecutor(new SimpleAsyncTaskExecutor());
      return container;
   }

   @Bean
   public ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener() {
      ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener();
      channelPublishingJmsMessageListener.setExpectReply(true);
      channelPublishingJmsMessageListener.setReplyTimeout(20000L);
      return channelPublishingJmsMessageListener;
   }
@MessageEndpoint
public class VinnaReplyService {

   private static final Logger LOG = LoggerFactory.getLogger(VinnaReplyService.class);

   @ServiceActivator(inputChannel = "vinnaReplyChannel", async = "true", poller = @Poller(fixedDelay = "2000"))
   public void handleVinnaReply(Message<?> message) {
      LOG.info("vinna reply message: '{}'", message.toString());

      // this method never gets invoked!!
      // the reply message should arrive here.

   }
}

Когда я отправляю сообщение в Очередь ответов через ActiveMQ Toolpanel и точный заданный correlationId, который был создан исходящим шлюзом, я получаю следующее предупреждение:

o.s.integration.jms.JmsOutboundGateway : Late reply for 6bfa4ca7-77d2-4b57-9ad0-5550ab5a617c_1

Но сообщение никогда не приходит в мой replyChannel. Почему это поведение? Я не могу получить ответное сообщение из моей очереди на ответный канал.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...