Как работает порядок очередей, если емкость заполнена и приходят новые сообщения? - PullRequest
0 голосов
/ 04 ноября 2019

У меня есть очередь с неограниченной емкостью. Однако я могу ограничить емкость, мой вопрос в том, в каком порядке будут обрабатываться сообщения. Если в очереди указано 100 000 сообщений, новое сообщение все еще поступает. будет ли выходной канал получать новое сообщение или выберет его из очереди в порядке FIFO?

<channel id="DeliveryProcessChannel">
 <queue/>
</channel>

<aggregator input-channel="DeliveryProcessChannel" output- 
   channel="EmailDispatchChannel"
   discard-channel="EmailDispatchChannel"
   expire-groups-upon-completion="true"    
   send-partial-result-on-expiry="true"
   group-timeout="1000"
   correlation-strategy-expression="T(Thread).currentThread().id"
   release-strategy-expression="size() == 1000">
       <poller max-messages-per-poll="1000" fixed-rate="1000"/>
</aggregator>

1 Ответ

1 голос
/ 04 ноября 2019

QueueChannel полностью основан на алгоритме FIFO. Если быть точным: то, что в памяти (по умолчанию) полностью основано на LinkedBlockingQueue.

Все постоянные реализации основаны на семантике FIFO хранилища. Например, у JDBC один такой запрос:

@Override
public String getPollFromGroupQuery() {
    return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " +
            "where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " +
            "order by CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY";
}

Если емкость заполнена, выполняется эта логика:

/**
 * Inserts the specified element into this queue, waiting up to the
 * specified wait time if necessary for space to become available.
 *
 * @param e the element to add
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return {@code true} if successful, or {@code false} if
 *         the specified waiting time elapses before space is available
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
boolean offer(E e, long timeout, TimeUnit unit)

, которая вызывается из QueueChannel.send(), возвращая boolean вукажите, требуется ли сообщение для очереди или нет.

Если вы хотите изменить сообщение заказа в обработанном канале очереди, вы можете указать свой собственный Queue impl:

/**
 * Create a channel with the specified queue.
 *
 * @param queue The queue.
 */
public QueueChannel(Queue<Message<?>> queue) {

В Spring Integration также есть PriorityChannel: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-prioritychannel

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...