Использование ActiveMQ с более чем одним экземпляром потребителя - PullRequest
0 голосов
/ 13 октября 2018

В нашей архитектуре у нас есть 2 или более контейнера для каждого приложения для задач с высокой доступностью.Мы используем activeMQ, и я хотел бы реализовать следующее поведение:

  1. Отправка сообщения в очередь.
  2. Это сообщение будет использоваться только * одним * контейнером (первое, которое его прочтет).
  3. Как только сообщение успешно обработано, потребитель обновит это сообщение, и его можно будет подтвердить и пренебречь.
  4. Я попытался использовать транзакцию с коммитом и использовать CLIENT_ACKNOWLEDGE, однако в обоих случаях оба потребителяполучил сообщение и обработал его.

Я хочу, чтобы только один потребитель обрабатывал каждое сообщение в зависимости от доступности.Наша реализация на Java.

Пожалуйста, поделитесь способом ее реализации.

Вот мой пример кода

 final Connection consumerConnection = connectionFactory.createConnection();
    consumerConnection.start();
    // Create a session.
    final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    final Destination consumerDestination = consumerSession.createQueue(queueName);


    // Create a message consumer from the session to the queue.
    final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    // Begin to wait for messages.
    Queue queue = consumerSession.createQueue(queueName);
    QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
    Enumeration msgs = queueBrowser.getEnumeration();
    while (msgs.hasMoreElements()) {
        //do your things here

        ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

        if (message == null)
            continue;

        //handle message
        System.out.println("Message received in : " + message);
        try {
            String text = message.getText();
            JSONObject messageJson = new JSONObject(text);
            consumer.receive(1000);

            String responseString = handleMessage(messageJson);

            message.acknowledge();

Спасибо, Моше

1 Ответ

0 голосов
/ 14 октября 2018

Проблема здесь в том, что вы подтверждаете сообщение из экземпляра QueueBrowser, которое не будет иметь никакого влияния, поскольку браузеры предназначены только для просмотра сообщений и не используют их.

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
    ...
    ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
    ...
    try {
        ...
        message.acknowledge(); // this does nothing

Вы фактически создают реальное потребление и вызывают consumer.receive(1000), но вы отбрасываете экземпляр Message, который возвращает receive().Это экземпляр Message, который необходимо подтвердить для фактического использования сообщения из очереди.

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
    ...
    try {
        ...
        Message actualMessage = consumer.receive(1000); // acknowledge this!
        ...

Еще одно важное замечание ... Сообщения, которые получает браузер очереди, не обязательно являются статическим снимкомочереди.Поэтому опасно предполагать, что ваш вызов consumer.receive(1000) на самом деле является тем же сообщением, которое пришло из браузера очереди.По моему мнению, вам следует переработать эту логику, чтобы она работала только с MessageConsumer и отбросить QueueBrowser.

...