У меня есть сценарий использования, в котором я хочу периодически сливать все данные в очереди ActiveMQ, а затем останавливаться. Я не смог найти никакого способа сделать это на родном верблюде, поэтому я написал для этого боб, используя DefaultConsumerTemplate.
public void pollConsumer() throws Exception {
long count = 0;
try {
if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
consumer.start();
producer.start();
while ( true ) {
logger.trace("Awaiting message: " + ++count );
Exchange exchange = consumer.receive( consumerEndpoint, 10000 );
if ( exchange == null ) break;
logger.trace("Processing message: " + count );
producer.send( exchange );
consumer.doneUoW( exchange );
logger.trace("Processed message: " + count );
}
producer.stop();
consumer.stop();
logger.debug( "Consumed " + (count - 1) + " message" + ( count == 2 ? "." : "s." ) );
} catch ( Throwable t ) {
logger.error("Something went wrong!", t );
throw t;
}
}
Я пытался использовать receiveNoWait
, но часто ничего не получалось и, конечно, не истощал очередь. Когда я пытаюсь сократить время ожидания, скажем, до 1000 мс, процесс проходит слишком далеко, опустошая очередь, а затем останавливается, тайм-аут активирован.
Я (очевидно, ошибочно) понял, что время ожидания было для потребителя, чтобы что-то потреблять, но похоже, что даже когда потребитель должен что-то потреблять, если он не может доставить это мгновенно, recieveNoWait
терпит неудачу, и даже может потерпеть неудачу с коротким таймаутом.
Есть ли лучший способ убедиться, что я получаю все сообщения, находящиеся в очереди в данный момент, и затем останавливаюсь?
Спасибо!