Ниже мой код.Я изменил время операционной системы на будущее, а затем вернулся обратно, произошло странное явление.Мое сообщение остается в cloudOpExecutionDelayChannel и больше не отправляется в cloudOpQueryExecutionStateChannel.Есть ли решение и идея?
XML:
<rabbit:queue name="cloudOpExecutionDelayChannel">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">15000</value>
</entry>
<entry key="x-max-length">
<value type="java.lang.Long">1000000</value>
</entry>
<entry key="x-dead-letter-exchange">
<value type="java.lang.String">cloud.execution.delay.exchange</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="cloudOpQueryExecutionStateChannel"></rabbit:queue>
<rabbit:topic-exchange name="cloud.execution.delay.exchange">
<rabbit:bindings>
<rabbit:binding pattern="cloudOpExecutionDelayChannel" queue="cloudOpQueryExecutionStateChannel"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
Java:
@Bean
public AmqpChannelFactoryBean cloudifyOpQueryExecutionStateChannel(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("cloudOpQueryExecutionStateChannel");
factoryBean.setPubSub(false);
return factoryBean;
}
@ServiceActivator(inputChannel = "cloudifyOpQueryExecutionStateChannel")
public void doQueryExecutionStateFlow(CloudifyOpMessage message) {
logger.info("query execution status flow for dp: " + message.deploymentId);
.......
if (CLOUDIFY_TERMINATED_STATE.equals(executionResponse.getState())) {
eventGateway.send(message.nextExecutionChannel, message);
} else if (CLOUDIFY_RESOURCE_NOT_FOUNND_STATE.equals(executionResponse.getState())) {
logger.warn("could not found any resource by deployment: {}, task: {}, skip this step and jump to next call back queue",
message.deploymentId, message.taskId);
eventGateway.send(message.nextExecutionChannel, message);
} else if (CLOUDIFY_FAILED_STATE.equals(executionResponse.getState()) ||
CLOUDIFY_CANCELLED_STATE.equals(executionResponse.getState())) {
CloudifyOpMessage msg = new CloudifyOpMessage();
msg.taskId = message.taskId;
msg.resultMsg = extractErrorMessage(executionResponse);
msg.isCancelled = Objects.equals(CLOUDIFY_CANCELLED_STATE, executionResponse.getState());
callBackGateway.send(message.failureCallBackChannel, msg);
} else {
eventGateway.send("cloudOpExecutionDelayChannel", message);
}
} catch (Exception e) {
handleConsumerException(message, e);
}
}