QueueChannel
полностью основан на алгоритме FIFO. Если быть точным: то, что в памяти (по умолчанию) полностью основано на LinkedBlockingQueue
.
Все постоянные реализации основаны на семантике FIFO хранилища. Например, у JDBC один такой запрос:
@Override
public String getPollFromGroupQuery() {
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " +
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " +
"order by CREATED_DATE, MESSAGE_SEQUENCE FETCH FIRST ROW ONLY";
}
Если емкость заполнена, выполняется эта логика:
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e, long timeout, TimeUnit unit)
, которая вызывается из QueueChannel.send()
, возвращая boolean
вукажите, требуется ли сообщение для очереди или нет.
Если вы хотите изменить сообщение заказа в обработанном канале очереди, вы можете указать свой собственный Queue
impl:
/**
* Create a channel with the specified queue.
*
* @param queue The queue.
*/
public QueueChannel(Queue<Message<?>> queue) {
В Spring Integration также есть PriorityChannel
: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-prioritychannel