Я использую этот скрипт для локального запуска ActiveMQ:
docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq
Запускает версию AMQ 5.15.6. Я соединяюсь с AMQ с STOMP через websockets (v1.2). Через веб-консоль в моей «тестовой» очереди я создаю два сообщения с одинаковой группой «test_grp». Я запускаю два процесса, и каждый из них запускает одинаковые логи c:
- connect
- подписка на "/ queue / test" с
activemq.prefetchSize: 1
и ack: client-individual
заголовками - в новом сообщении sleep 5se c и отправка подтверждения для сообщения
Оба процесса получают сообщения немедленно, в то время как второе сообщение должно быть получено тем же процессом или, по крайней мере, оно должно быть получено после первый - ACKed.
Кроме того, если я запускаю только один процесс / подписку с заголовком activemq.prefetchSize: 2
, то этот процесс получает сразу два сообщения, а не последовательно, после первого ACKed.
Так что, кажется, что JMSXGroupID не влияет на обработку сообщений. Возможно ли, что что-то не настроено правильно на стороне брокера?
Я уверен, что сообщения не ACKed автоматически, потому что они все еще находятся в очереди, пока потребитель не подтвердит их.
Через некоторое время Тестирование Я выяснил, что группировка по двум потребителям работает. Однако для одного потребителя с activemq.prefetchSize: 2
он сразу получает два сообщения из одной группы. Это ожидаемое поведение? Если да, то кажется, что если кто-то хочет обрабатывать сообщения в порядке, он должен установить activemq.prefetchSize
на 1
при подписке?
Вот фрагмент кода для проверки (Node.js 12.x и требует пакетов @stomp/stompjs
и websocket
):
Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');
function createClient() {
return new Client({
brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
});
}
function createLog(name) {
return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}
function createConsumer(name) {
const log = createLog(name);
const client = createClient();
client.onConnect = () => {
log('CLIENT_CONNECTED');
client.subscribe('/queue/test', (msg) => {
log('RECEIVED_MESSAGE');
setTimeout(() => {
msg.ack();
log('ACKED_MESSAGE');
}, 10000);
}, {
'activemq.prefetchSize': '2',
ack: 'client-individual',
});
}
client.activate();
}
function publishMessages() {
const log = createLog();
const client = createClient();
client.onConnect = () => {
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 1',
});
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 2',
});
};
client.activate();
}
createConsumer('A');
createConsumer('B');
setTimeout(() => {
publishMessages();
}, 2000);
Вывод:
22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
Вот заголовки, полученные клиентом STOMP (информация о группе отсутствует):
headers: {
timestamp: '1580796287376',
persistent: 'true',
'message-id': 'ID:04f803c080ac-46709-1580756713304-3:66:-1:1:2',
priority: '4',
subscription: 'sub-0',
ack: 'ID:04f803c080ac-46709-1580756713304-70:2',
destination: '/queue/test',
expires: '0',
'content-length': '14'
},
Это странно, потому что кажется, что JMSXGroupID не гарантирует обработку заказа в группе. Это только гарантирует, что сообщения будут доставлены одному и тому же потребителю в том же порядке, но сообщение B может быть доставлено потребителю до того, как оно подтвердит сообщение A (или даже «начнет» обработку A) (при условии, что у потребителя activemq.prefetchSize > 1
). Я не понимаю этого, потому что, поскольку потребитель использует режим ack: client-individual
, это означает, что до тех пор, пока потребитель не отправит сообщение ACK, не может рассматриваться как доставленный брокером. Так почему же брокер отправляет сообщение из группы, для которой он знает, было ли доставлено предыдущее сообщение из той же группы? Может быть, можно как-то настроить брокер ActiveMQ, чтобы предотвратить это?
Другое решение было бы каким-то образом получить заголовок JMSXGroupID вместе с сообщением от брокера, который не работает (по крайней мере, в STOMP) - в таком случае, брокер может иметь локальную мини-очередь (и) для локального упорядочения сообщений из той же группы в правильном порядке.
Мой пример использования: я хочу иметь очередь, которая является входным каналом для микросервиса (/ queue / my -служба). Другие микросервисы будут отправлять ему сообщения / события / команды. Некоторые из них могут иметь JMSXGroupID, другие - нет. То, что я хочу, это иметь одну подписку с некоторым параллелизмом (activemq.prefetchSize > 1
). Таким образом, если 6 сообщений (A1, A2, A3, B, C, D) получены «сразу» и три из них относятся к одной группе (A1, A3, A3), а другие находятся в разных группах или не принадлежат группа вообще, тогда потребитель должен обрабатывать параллельно:
- A1 -> A2 -> A3 (последовательно)
- B
- C
- D
Я сообщил об ошибке (похоже на ошибку, о которой упоминал @Justin Bertram) в связи с отсутствующим заголовком JMSXGroupID в сообщении здесь: https://issues.apache.org/jira/browse/AMQ-7395