ActiveMQ JMSXGroupID не работает должным образом - PullRequest
0 голосов
/ 04 февраля 2020

Я использую этот скрипт для локального запуска ActiveMQ:

docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq

Запускает версию AMQ 5.15.6. Я соединяюсь с AMQ с STOMP через websockets (v1.2). Через веб-консоль в моей «тестовой» очереди я создаю два сообщения с одинаковой группой «test_grp». Я запускаю два процесса, и каждый из них запускает одинаковые логи c:

  • connect
  • подписка на "/ queue / test" с activemq.prefetchSize: 1 и ack: client-individual заголовками
  • в новом сообщении sleep 5se c и отправка подтверждения для сообщения

Оба процесса получают сообщения немедленно, в то время как второе сообщение должно быть получено тем же процессом или, по крайней мере, оно должно быть получено после первый - ACKed.

Кроме того, если я запускаю только один процесс / подписку с заголовком activemq.prefetchSize: 2, то этот процесс получает сразу два сообщения, а не последовательно, после первого ACKed.

Так что, кажется, что JMSXGroupID не влияет на обработку сообщений. Возможно ли, что что-то не настроено правильно на стороне брокера?

Я уверен, что сообщения не ACKed автоматически, потому что они все еще находятся в очереди, пока потребитель не подтвердит их.


Через некоторое время Тестирование Я выяснил, что группировка по двум потребителям работает. Однако для одного потребителя с activemq.prefetchSize: 2 он сразу получает два сообщения из одной группы. Это ожидаемое поведение? Если да, то кажется, что если кто-то хочет обрабатывать сообщения в порядке, он должен установить activemq.prefetchSize на 1 при подписке?

Вот фрагмент кода для проверки (Node.js 12.x и требует пакетов @stomp/stompjs и websocket):

Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');

function createClient() {
  return new Client({
    brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
  });
}

function createLog(name) {
  return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}

function createConsumer(name) {
  const log = createLog(name);
  const client = createClient();

  client.onConnect = () => {
    log('CLIENT_CONNECTED');
    client.subscribe('/queue/test', (msg) => {
      log('RECEIVED_MESSAGE');
      setTimeout(() => {
        msg.ack();
        log('ACKED_MESSAGE');
      }, 10000);
    }, {
      'activemq.prefetchSize': '2',
      ack: 'client-individual',
    });
  }

  client.activate();
}

function publishMessages() {
  const log = createLog();
  const client = createClient();

  client.onConnect = () => {
    client.publish({
      destination: '/queue/test',
      headers: {
        persistent: true,
        JMSXGroupID: 'grp',
      },
      body: 'test message 1',
    });
    client.publish({
      destination: '/queue/test',
      headers: {
        persistent: true,
        JMSXGroupID: 'grp',
      },
      body: 'test message 2',
    });
  };

  client.activate();
}

createConsumer('A');
createConsumer('B');

setTimeout(() => {
  publishMessages();
}, 2000);

Вывод:

22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE

Вот заголовки, полученные клиентом STOMP (информация о группе отсутствует):

headers: {
    timestamp: '1580796287376',
    persistent: 'true',
    'message-id': 'ID:04f803c080ac-46709-1580756713304-3:66:-1:1:2',
    priority: '4',
    subscription: 'sub-0',
    ack: 'ID:04f803c080ac-46709-1580756713304-70:2',
    destination: '/queue/test',
    expires: '0',
    'content-length': '14'
  },

Это странно, потому что кажется, что JMSXGroupID не гарантирует обработку заказа в группе. Это только гарантирует, что сообщения будут доставлены одному и тому же потребителю в том же порядке, но сообщение B может быть доставлено потребителю до того, как оно подтвердит сообщение A (или даже «начнет» обработку A) (при условии, что у потребителя activemq.prefetchSize > 1). Я не понимаю этого, потому что, поскольку потребитель использует режим ack: client-individual, это означает, что до тех пор, пока потребитель не отправит сообщение ACK, не может рассматриваться как доставленный брокером. Так почему же брокер отправляет сообщение из группы, для которой он знает, было ли доставлено предыдущее сообщение из той же группы? Может быть, можно как-то настроить брокер ActiveMQ, чтобы предотвратить это?

Другое решение было бы каким-то образом получить заголовок JMSXGroupID вместе с сообщением от брокера, который не работает (по крайней мере, в STOMP) - в таком случае, брокер может иметь локальную мини-очередь (и) для локального упорядочения сообщений из той же группы в правильном порядке.

Мой пример использования: я хочу иметь очередь, которая является входным каналом для микросервиса (/ queue / my -служба). Другие микросервисы будут отправлять ему сообщения / события / команды. Некоторые из них могут иметь JMSXGroupID, другие - нет. То, что я хочу, это иметь одну подписку с некоторым параллелизмом (activemq.prefetchSize > 1). Таким образом, если 6 сообщений (A1, A2, A3, B, C, D) получены «сразу» и три из них относятся к одной группе (A1, A3, A3), а другие находятся в разных группах или не принадлежат группа вообще, тогда потребитель должен обрабатывать параллельно:

  • A1 -> A2 -> A3 (последовательно)
  • B
  • C
  • D

Я сообщил об ошибке (похоже на ошибку, о которой упоминал @Justin Bertram) в связи с отсутствующим заголовком JMSXGroupID в сообщении здесь: https://issues.apache.org/jira/browse/AMQ-7395

1 Ответ

1 голос
/ 04 февраля 2020

Вы видите ожидаемое поведение. Если для activemq.prefetchSize установлено значение больше 1, то брокер будет отправлять клиенту более 1 сообщения за раз. Поскольку stomp js будет вызывать функцию обратного вызова, которую вы передаете subscribe при получении сообщения, вам придется самостоятельно контролировать порядок подтверждения или просто установить activemq.prefetchSize на 1.

Для адресации. некоторые из ваших указанных c баллов ...

... JMSXGroupID не гарантирует обработку заказа в группе. Это только гарантирует, что сообщения будут доставлены одному и тому же потребителю в том же порядке ...

Это совершенно верно. Группировка сообщений с заголовком JMSXGroupID только гарантирует, что сообщения в той же группе будут доставлены конкретному потребителю в том порядке, в котором они были получены посредником. Как только это будет сделано, порядок подтверждения будет принадлежать самому клиенту.

Вообще говоря, предполагается, что параллелизм обработки сообщений исходит от нескольких одновременно работающих потребителей, а не от одного потребителя (или группы потребителей), который обрабатывает сообщения одновременно. сам. В этом общем случае доставки сгруппированных сообщений одному потребителю достаточно, чтобы гарантировать порядок, поскольку он будет обрабатывать сообщения последовательно.

Так почему брокер отправляет сообщение из группы, для которой он знает, является ли предыдущее сообщение тем же группа была доставлена?

Посредник отправляет несколько сообщений из одной группы, потому что вы попросили об этом, установив для activemq.prefetchSize значение больше 1.

...