Я пытаюсь реализовать поток запросов / ответов JmsOutboundGateway. Мой поток отправляет сообщение через JmsOutboundGateway в ActiveMQ. Это отлично работает. После этого я слушаю другое имя очереди, которое должно быть ответом для шлюза. Однако это не работает. Я не получаю ответ правильно.
@Configuration
public class VinnaOutboundConfig {
@Value("${file.output.backup-dir}")
private String backupDir;
@Value("${jms.entry}")
private String jmsFileInboxDestination;
@Value("${jms.vinna}")
private String jmsVinnaDestination;
@Value("${jms.vinna-response}")
private String jmsVinnaResponseDestination;
private final MessageHandler writeBackupFileMessageHandler;
private final ConnectionFactory connectionFactory;
@Autowired
public VinnaOutboundConfig(MessageHandler writeBackupFileMessageHandler,
ConnectionFactory connectionFactory) {
this.writeBackupFileMessageHandler = writeBackupFileMessageHandler;
this.connectionFactory = connectionFactory;
}
@Bean
public MessageChannel vinnaRequestChannel() { return new DirectChannel(); }
@Bean
public PollableChannel vinnaReplyChannel() { return new QueueChannel(); }
@Bean
public MessageChannel vinnaErrorChannel() { return new DirectChannel(); }
@Bean
public IntegrationFlow vinnaOutboundFlow() throws ParserConfigurationException {
return IntegrationFlows
// receive message from queue
.from(jmsInboundGateway())
.transform(new ResultToStringTransformer())
.handle(jmsVinnaOutboundGateway())
.get();
}
@Bean
public JmsOutboundGateway jmsVinnaOutboundGateway() {
JmsOutboundGateway gateway = new JmsOutboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setCorrelationKey("JMSCorrelationID");
gateway.setRequestDestinationName(jmsVinnaDestination);
gateway.setAsync(true);
gateway.setUseReplyContainer(true);
gateway.setReplyContainerProperties(replyContainerProperties());
gateway.setReplyChannel(vinnaReplyChannel());
gateway.setReplyDestinationName(jmsVinnaResponseDestination);
gateway.setReceiveTimeout(10000L);
return gateway;
}
@Bean
public JmsOutboundGateway.ReplyContainerProperties replyContainerProperties() {
JmsOutboundGateway.ReplyContainerProperties properties = new JmsOutboundGateway.ReplyContainerProperties();
properties.setTaskExecutor(new SimpleAsyncTaskExecutor());
properties.setConcurrentConsumers(1);
return properties;
}
@Bean
public JmsInboundGateway jmsInboundGateway() {
JmsInboundGateway gateway = new JmsInboundGateway(simpleMessageListenerContainer(), channelPublishingJmsMessageListener());
gateway.setRequestChannel(vinnaRequestChannel());
gateway.setErrorChannel(vinnaErrorChannel());
return gateway;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName(jmsFileInboxDestination);
container.setTaskExecutor(new SimpleAsyncTaskExecutor());
return container;
}
@Bean
public ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener() {
ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener = new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setExpectReply(true);
channelPublishingJmsMessageListener.setReplyTimeout(20000L);
return channelPublishingJmsMessageListener;
}
@MessageEndpoint
public class VinnaReplyService {
private static final Logger LOG = LoggerFactory.getLogger(VinnaReplyService.class);
@ServiceActivator(inputChannel = "vinnaReplyChannel", async = "true", poller = @Poller(fixedDelay = "2000"))
public void handleVinnaReply(Message<?> message) {
LOG.info("vinna reply message: '{}'", message.toString());
// this method never gets invoked!!
// the reply message should arrive here.
}
}
Когда я отправляю сообщение в Очередь ответов через ActiveMQ Toolpanel и точный заданный correlationId, который был создан исходящим шлюзом, я получаю следующее предупреждение:
o.s.integration.jms.JmsOutboundGateway : Late reply for 6bfa4ca7-77d2-4b57-9ad0-5550ab5a617c_1
Но сообщение никогда не приходит в мой replyChannel. Почему это поведение? Я не могу получить ответное сообщение из моей очереди на ответный канал.