Фон
Я использую сервер обработки, который использует ActiveMQ для хранения очереди заданий.Во время работы я бы хотел, чтобы работник продолжал выполнять задания того же типа (дешево оставаться в типе, но переключаться дорого), но переключать типы, если игнорируются задания другого типа.
Два простых случая:
- Задания ожидают в очереди.Когда работник становится доступным, он должен выбрать задание только что обработанного типа и вернуться к любому заданию, если не найдено ни одного задания.
- Рабочие ждут.Когда добавляется новое задание, его следует передать работнику, который только что обработал этот тип задания, и откатить любого работника, если такого работника нет.
Пример
Моя попытка сейчас упрощает вещи с prefetch = 0, и у каждого работника есть два потребителя с разными уровнями приоритета.Я опрашиваю высокоприоритетный один раз, а затем блокирую для низкоприоритетного.
Сообщения - это просто идентификатор задания (подробности в БД) со свойством для типа задания:
public void put(String jobId, String type) {
jmsTemplate.send(session -> {
TextMessage textMessage = session.createTextMessage(jobId);
textMessage.setStringProperty("type", type);
return textMessage;
});
}
Всякий раз, когда работник запускает новый тип задания, он устанавливает двух потребителей с разными consumer.priority
s
MessageConsumer consumer = null;
MessageConsumer defaultConsumer = null;
void onResourcesChange(String type) throws JMSException {
stopConsumers();
if (type != null) {
consumer = session.createConsumer(createConsumer(2), "type = '" + type + "'");
}
defaultConsumer = session.createConsumer(createConsumer(1));
}
private Destination createConsumer(int consumerPriority) throws JMSException {
return jmsTemplate.getDestinationResolver().resolveDestinationName(session,
queueName + "?consumer.prefetchSize=0&consumer.priority=" + consumerPriority.priority, false);
}
А затем, когда он простаивает, он получает сообщения в цикле:
while (!shutdown) {
Message nextMessage = null;
if (consumer != null) {
nextMessage = consumer.receiveNoWait(100);
}
if (nextMessage == null) {
if (defaultConsumer != null) {
nextMessage = defaultConsumer.receive();
}
}
// do something with the message
}
Вопрос
Есть ли способ прослушивания сообщений о заданиях с подходящим типом, с откатом к любому?
НапримерМожете ли вы иметь общий предел предварительной выборки между двумя потребителями и получать сообщения, которые лучше соответствуют друг другу, пока их нет?
Или есть способ выразить оба в одном и том же селекторе?Возможно с возрастом вместо приоритета?
priority1(type = 'type') || priority2(type != 'type')
type = 'type' || age > 5min