Почему мой ConsumerTemplate не читает сообщения из темы ActiveMQ? - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь время от времени получать сообщения в качестве получателя опроса из темы ActiveMQ с использованием конечной точки подписчика длительного пользования.

В моем компоненте у меня есть шаблон ConsumerTemplate, из которого я пытаюсьпринимать обмены и отправлять на другой URI.

Метод бина:

public void pollConsumer() throws Exception {
    long count = 0;
    try {
    if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
    logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
    consumer.start();
    producer.start();
    while ( true ) {
        logger.trace("Awaiting message: " + ++count );
        Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
        if ( exchange == null ) break;
        logger.trace("Processing message: " + count );
        producer.send( exchange );
        consumer.doneUoW( exchange );
        logger.trace("Processed message: " + count );
    }
    producer.stop();
    consumer.stop();
    } catch ( Throwable t ) {
        logger.error("Something went wrong!", t );
        throw t;
    }
}

При вызове регистратор отображает сообщение «Потребление» в виде

activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27

, насколько я вижу, это правильно (селектор должен читать RecordType IN ('1', '2') AND SubType = '2' без кодировки URL.

Я получаю один журнал "Ожидание сообщения" и ничего больше, поэтому кажется, что ничего

Как ни странно, он также не регистрируется в ActiveMQ как надежный подписчик, поэтому кажется, что он вообще ничего не делает, но и не регистрирует никаких ошибок, так что я довольно озадачен.

Кто-нибудь может подсказать, почему это может не сработать, или, по крайней мере, с чего мне начать искать?

Ответы [ 2 ]

0 голосов
/ 29 ноября 2018

Приняв во внимание ответ @pcoates и попытавшись увеличить время ожидания для целей тестирования, стало очевидно, что проблема заключалась в том, что опция долговременной подписки на URI не была задействована, а новые сообщения по этой теме отсутствовали.в течение 1 секунды ожидания ничего не произошло.

Ответ на другой вопрос , касающийся долговременных подписок, объясняет, что нельзя использовать долговременную подписку от потребителя опроса.

Поэтому мой обходной путь - подписаться на тему и направить сообщение в новую очередь, а также включить получателя опроса в эту новую очередь.Это не очень хорошо, поскольку я бы предпочел не иметь дополнительной очереди, но она работает и требует меньше усилий, чем написание новой версии JMSPollingConsumer.

0 голосов
/ 28 ноября 2018

Ваш pollConsumer остановится, если ему придется ждать более секунды, пока сообщение не окажется в очереди / теме.

Он ждет сообщения в течение 1 секунды, после чего возвращает ноль, прерывает цикл while и останавливает потребителя.

    Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
==> if ( exchange == null ) break;
    logger.trace("Processing message: " + count );
    producer.send( exchange );
    consumer.doneUoW( exchange );

Было бы проще просто использовать Apache-Camel Route, чтобы сделать то, что вы описываете.

...