Возможно ли сделать мультиприоритетным потребителем в ActiveMQ JMS? - PullRequest
0 голосов
/ 04 апреля 2019

Фон

Я использую сервер обработки, который использует ActiveMQ для хранения очереди заданий.Во время работы я бы хотел, чтобы работник продолжал выполнять задания того же типа (дешево оставаться в типе, но переключаться дорого), но переключать типы, если игнорируются задания другого типа.

Два простых случая:

  1. Задания ожидают в очереди.Когда работник становится доступным, он должен выбрать задание только что обработанного типа и вернуться к любому заданию, если не найдено ни одного задания.
  2. Рабочие ждут.Когда добавляется новое задание, его следует передать работнику, который только что обработал этот тип задания, и откатить любого работника, если такого работника нет.

Пример

Моя попытка сейчас упрощает вещи с prefetch = 0, и у каждого работника есть два потребителя с разными уровнями приоритета.Я опрашиваю высокоприоритетный один раз, а затем блокирую для низкоприоритетного.

Сообщения - это просто идентификатор задания (подробности в БД) со свойством для типа задания:

public void put(String jobId, String type) {
    jmsTemplate.send(session -> {
        TextMessage textMessage = session.createTextMessage(jobId);
        textMessage.setStringProperty("type", type);
        return textMessage;
    });
}

Всякий раз, когда работник запускает новый тип задания, он устанавливает двух потребителей с разными consumer.priority s

MessageConsumer consumer = null;
MessageConsumer defaultConsumer = null;

void onResourcesChange(String type) throws JMSException {
    stopConsumers();
    if (type != null) {
        consumer = session.createConsumer(createConsumer(2), "type = '" + type + "'");
    }
    defaultConsumer = session.createConsumer(createConsumer(1));
}

private Destination createConsumer(int consumerPriority) throws JMSException {
    return jmsTemplate.getDestinationResolver().resolveDestinationName(session,
            queueName + "?consumer.prefetchSize=0&consumer.priority=" + consumerPriority.priority, false);
}

А затем, когда он простаивает, он получает сообщения в цикле:

while (!shutdown) {
    Message nextMessage = null;
    if (consumer != null) {
        nextMessage = consumer.receiveNoWait(100);
    }

    if (nextMessage == null) {
        if (defaultConsumer != null) {
            nextMessage = defaultConsumer.receive();
        }
    }
    // do something with the message
}

Вопрос

Есть ли способ прослушивания сообщений о заданиях с подходящим типом, с откатом к любому?

НапримерМожете ли вы иметь общий предел предварительной выборки между двумя потребителями и получать сообщения, которые лучше соответствуют друг другу, пока их нет?

Или есть способ выразить оба в одном и том же селекторе?Возможно с возрастом вместо приоритета?

priority1(type = 'type') || priority2(type != 'type')
type = 'type' || age > 5min
...