У меня есть Java-приложение, в котором задача Quartz запускается каждые 300
мс, и я пытаюсь получить несколько сообщений из очереди ActiveMQ, скажем, 5
, и обработать их, каждая запрашиваемая основная масса требует около 1
минуты для обработки доосвободить рабочий кварцевого планировщика
- Все потоки используют одно и то же соединение, которое блокируется синхронизированным блоком в методе приема, например:
protected List<?> receive(int bulkSize, String queueName, long receiveTimeoutMillis) {
LinkedList messages = new LinkedList();
try {
QueueReceiver receiver = (QueueReceiver)this.receivers.get(queueName);
if (receiver != null) {
ObjectMessage message = null;
int index = 1;
do {
message = (ObjectMessage)receiver.receive(receiveTimeoutMillis);
if (message != null) {
messages.add(message.getObject());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Message received from: " + receiver.getQueue().getQueueName() + " jms message id:" + message.getJMSMessageID());
}
message.acknowledge();
}
if (message == null) {
break;
}
++index;
} while(index <= bulkSize);
LOGGER.info("Consumed " + (index - 1) + " messages from " + queueName + ", elapsed time: " + stopWatch.getTime() + " ms");
} else {
LOGGER.warn("Queue not found " + queueName);
}
} catch (Exception var10) {
LOGGER.warn("error in performing receive: " + var10.getMessage(), var10);
}
return messages;
}
receiveTimeoutMillis
isвсегда 150
мс, bulkSize
, как я уже сказал, 5
.
, когда я запускаю свое приложение, каждый поток использует этот метод, получает большую часть 5
сообщений, но через пару минут они всеначать получать сообщения от 0
до 2
, хотя очередь составляет заполнена с сообщениями
Понятия не имею, почему это происходит!, поэтому, пожалуйста, дайте мне подсказкупроверьте это!
Примечания
, хотя я увеличиваю timeout
метода receive
из очереди, тем не менее получатель не получает никаких сообщений!
в лog Я вижу следующие сообщения от каждого потока, выполняющего вышеуказанный код:
2019-01-29 14:59:23,893 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-2] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:24,329 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-2] Consumed 1 messages from TRANSLATOR, elapsed time: 282 ms
2019-01-29 14:59:24,500 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:24,793 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,097 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-18] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,403 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-18] Consumed 1 messages from TRANSLATOR, elapsed time: 153 ms
2019-01-29 14:59:25,693 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:25,996 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,300 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,595 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:26,898 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:27,193 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
2019-01-29 14:59:27,496 INFO com.peer39.commons.pattern.jms.JMSService - [DefaultQuartzScheduler_Worker-15] Consumed 0 messages from TRANSLATOR, elapsed time: 150 ms
Я добавляю поток демона, который запускается каждые 300
мс
public class Daemon extends ParallelQuartzDaemon {
@Override
protected ICollectorAgent.ProcessStatus executeWork(JobExecutionContext jobExecutionContext,
Map<String, Double> properties,
Map<String, String> alerts) throws Exception {
ICollectorAgent.ProcessStatus processStatus = ICollectorAgent.ProcessStatus.SUCCESS;
List<FlowRequest> flowRequests = getFlowRequestsForTranslation();
if (!flowRequests.isEmpty()) {
//DO Work! //takes 1-2 minutes!
}
return processStatus;
}
private List<FlowRequest> getFlowRequestsForTranslation() {
translatorContext.getTranslatorJMSService().getFlowRequestsToTranslate(5);
}
}
и класс JMS:
public class TranslatorJMSService extends JMSService {
public List<FlowRequest> getFlowRequestsToTranslate(int count) {
final Long jmsReceiveTimeoutMillis = translatorConfiguration.getJmsReceiveTimeoutMillis();
return (List<FlowRequest>) receive(count, queueName, jmsReceiveTimeoutMillis);
}
}
и, наконец, метод приема упоминается выше.
спасибо!