Изменение системного времени приводит к тому, что rabbitmq не работает обмен письмами - PullRequest
0 голосов
/ 21 сентября 2019

Ниже мой код.Я изменил время операционной системы на будущее, а затем вернулся обратно, произошло странное явление.Мое сообщение остается в cloudOpExecutionDelayChannel и больше не отправляется в cloudOpQueryExecutionStateChannel.Есть ли решение и идея?

XML:

<rabbit:queue name="cloudOpExecutionDelayChannel">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">15000</value>
        </entry>
        <entry key="x-max-length">
            <value type="java.lang.Long">1000000</value>
        </entry>
        <entry key="x-dead-letter-exchange">
            <value type="java.lang.String">cloud.execution.delay.exchange</value>
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:queue name="cloudOpQueryExecutionStateChannel"></rabbit:queue>

<rabbit:topic-exchange name="cloud.execution.delay.exchange">
    <rabbit:bindings>
        <rabbit:binding pattern="cloudOpExecutionDelayChannel" queue="cloudOpQueryExecutionStateChannel"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

Java:

@Bean
public AmqpChannelFactoryBean cloudifyOpQueryExecutionStateChannel(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("cloudOpQueryExecutionStateChannel");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@ServiceActivator(inputChannel = "cloudifyOpQueryExecutionStateChannel")
public void doQueryExecutionStateFlow(CloudifyOpMessage message) {
    logger.info("query execution status flow for dp: " + message.deploymentId);
    .......
        if (CLOUDIFY_TERMINATED_STATE.equals(executionResponse.getState())) {
            eventGateway.send(message.nextExecutionChannel, message);
        } else if (CLOUDIFY_RESOURCE_NOT_FOUNND_STATE.equals(executionResponse.getState())) {
            logger.warn("could not found any resource by deployment: {}, task: {}, skip this step and jump to next call back queue",
                    message.deploymentId, message.taskId);
            eventGateway.send(message.nextExecutionChannel, message);
        } else if (CLOUDIFY_FAILED_STATE.equals(executionResponse.getState()) ||
                CLOUDIFY_CANCELLED_STATE.equals(executionResponse.getState())) {
            CloudifyOpMessage msg = new CloudifyOpMessage();
            msg.taskId = message.taskId;
            msg.resultMsg = extractErrorMessage(executionResponse);
            msg.isCancelled = Objects.equals(CLOUDIFY_CANCELLED_STATE, executionResponse.getState());
            callBackGateway.send(message.failureCallBackChannel, msg);
        } else {
            eventGateway.send("cloudOpExecutionDelayChannel", message);
        }
    } catch (Exception e) {
        handleConsumerException(message, e);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...