Потребитель ActiveMQ не получает желаемого количества сообщений - PullRequest
0 голосов
/ 29 января 2019

У меня есть Java-приложение, в котором задача Quartz запускается каждые 300 мс, и я пытаюсь получить несколько сообщений из очереди ActiveMQ, скажем, 5, и обработать их, каждая запрашиваемая основная масса требует около 1 минуты для обработки доосвободить рабочий кварцевого планировщика

  • Все потоки используют одно и то же соединение, которое блокируется синхронизированным блоком в методе приема, например:
protected List<?> receive(int bulkSize, String queueName, long receiveTimeoutMillis) {
        LinkedList messages = new LinkedList();

        try {
            QueueReceiver receiver = (QueueReceiver)this.receivers.get(queueName);
            if (receiver != null) {
                ObjectMessage message = null;
                int index = 1; 

                do {
                        message = (ObjectMessage)receiver.receive(receiveTimeoutMillis);

                    if (message != null) {
                        messages.add(message.getObject());
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Message received from: " + receiver.getQueue().getQueueName() + " jms message id:" + message.getJMSMessageID());
                        }

                        message.acknowledge();
                    }

                    if (message == null) {
                        break;
                    }

                    ++index;
                } while(index <= bulkSize);


                LOGGER.info("Consumed " + (index - 1) + " messages from " + queueName + ", elapsed time: " + stopWatch.getTime() + " ms");
            } else {
                LOGGER.warn("Queue not found " + queueName);
            }
        } catch (Exception var10) {
            LOGGER.warn("error in performing receive: " + var10.getMessage(), var10);
        }

        return messages;
    }

receiveTimeoutMillis isвсегда 150 мс, bulkSize, как я уже сказал, 5.

, когда я запускаю свое приложение, каждый поток использует этот метод, получает большую часть 5 сообщений, но через пару минут они всеначать получать сообщения от 0 до 2, хотя очередь составляет заполнена с сообщениями

Понятия не имею, почему это происходит!, поэтому, пожалуйста, дайте мне подсказкупроверьте это!

Примечания

  • , хотя я увеличиваю timeout метода receive из очереди, тем не менее получатель не получает никаких сообщений!

  • в лog Я вижу следующие сообщения от каждого потока, выполняющего вышеуказанный код:


2019-01-29 14:59:23,893 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-2] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:24,329 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-2] Consumed 1 messages from TRANSLATOR, elapsed time: 282 ms
2019-01-29 14:59:24,500 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:24,793 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,097 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-18] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,403 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-18] Consumed 1 messages from TRANSLATOR, elapsed time: 153 ms
2019-01-29 14:59:25,693 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,996 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,300 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,595 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,898 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:27,193 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:27,496 INFO  com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms

Я добавляю поток демона, который запускается каждые 300 мс

public class Daemon extends ParallelQuartzDaemon {

   @Override
    protected ICollectorAgent.ProcessStatus executeWork(JobExecutionContext jobExecutionContext,
                                                        Map<String, Double> properties,
                                                        Map<String, String> alerts) throws Exception {

        ICollectorAgent.ProcessStatus processStatus = ICollectorAgent.ProcessStatus.SUCCESS;
        List<FlowRequest> flowRequests = getFlowRequestsForTranslation();
        if (!flowRequests.isEmpty()) {
           //DO Work! //takes 1-2 minutes!
        }
        return processStatus;
    }

 private List<FlowRequest> getFlowRequestsForTranslation() {
     translatorContext.getTranslatorJMSService().getFlowRequestsToTranslate(5);
    }

}


и класс JMS:

public class TranslatorJMSService extends JMSService {

   public List<FlowRequest> getFlowRequestsToTranslate(int count) {
        final Long jmsReceiveTimeoutMillis = translatorConfiguration.getJmsReceiveTimeoutMillis();
            return (List<FlowRequest>) receive(count, queueName, jmsReceiveTimeoutMillis);
    }

}

и, наконец, метод приема упоминается выше.

спасибо!

...