Как предотвратить голодание сообщений с низким приоритетом в очереди с приоритетом ActiveMQ? - PullRequest
4 голосов
/ 18 июня 2011

Я работаю в системе, где нам нужно реализовать приоритетную очередь. У нас есть сообщения с разными приоритетами, и нам нужно обрабатывать сообщения на основе приоритетов. В настоящее время мы стремимся использовать ActiveMQ в качестве нашей технологии организации очередей по многим причинам, одна из которых заключается в том, что она поддерживает очереди с приоритетами.

Как лучше всего справляться с голодом в очереди с приоритетом в ActiveMQ? Чтобы быть точным, мы должны гарантировать, что даже сообщение с низким приоритетом в конечном итоге обрабатывается, даже если сообщения с более высоким приоритетом продолжают заполнять очередь. Есть ли в ActiveMQ что-то встроенное? Или нам нужно создать что-то свое, чтобы повысить приоритет по мере старения сообщения?

1 Ответ

4 голосов
/ 18 июня 2011

Основной способ сделать это - повысить приоритет, когда сообщение становится старше.

. Таким образом, сообщение с низким приоритетом, скажем, час назад, имеет более высокий приоритет, чем новое сообщение с высоким приоритетом

public class Message implements Comparable<Message>{

    private final long time;//timestamp from creation (can be altered to insertion in queue) in millis the lower this value the older the message (and more important that it needs to be handled)
    private final int pr;//priority the higher the value the higher the priority

    /**
     * the offset that the priority brings currently set for 3 hours 
     *
     * meaning a message with pr==1 has equal priority than a message with pr==0 from 3 hours ago
     */
    private static final long SHIFT=3*60*60*1000; 

    public Message(int priority){
        this.pr=priority;
        this.time = System.currentTimeMillis();
    }

    //I'm assuming here the priority sorting is done with natural ordering
    public boolean compareTo(Message other){
        long th = this.time-this.pr*SHIFT;
        long ot = other.time-other.pr*SHIFT;
        if(th<ot)return 1;
        if(th>ot)return -1;
        return 0;
    }

}

, как отмечалось в комментариях, однако поток сообщений с низким уровнем prio от нескольких часов назад временно лишит новых сообщений с высоким значением prio, и для их правильного распределения потребуется более сложный метод


другой метод использует несколько очередей, по одной для каждого приоритета, и отбирает несколько из очереди с более высоким приоритетом для каждой из очереди с низким приоритетом

, этот последний метод действительно реально подходит только для небольшого количества приоритетов, в то время какПервый метод, который я предоставил, может обрабатывать произвольное количество приоритетов

...