Несколько потребителей в одной очереди RabbitMQ - PullRequest
0 голосов
/ 30 декабря 2018

Я следую этому руководству от RabbitMQ: https://www.rabbitmq.com/tutorials/tutorial-two-java.html. Я хочу эмулировать эту функцию с несколькими потоками в одной очереди.

Если я запускаю свои приемники до запуска отправителя, он работает как положено, как показано ниже:

[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received 'Hello 0'
[x] Rcvr2 Received 'Hello 1'
[x] Rcvr1 Received 'Hello 2'
[x] Rcvr2 Received 'Hello 3'
[x] Rcvr1 Received 'Hello 4'
[x] Rcvr2 Received 'Hello 5'
[x] Rcvr1 Received 'Hello 6'
[x] Rcvr2 Received 'Hello 7'
[x] Rcvr1 Received 'Hello 8'
...

Однако при запуске моих приемников сначала только один поток получает сообщения (последнийпоток должен быть запущен):

[*] Rcvr2 Waiting for messages...
[*] Rcvr1 Waiting for messages...
[x] Rcvr1 Received 'Hello 9'
[x] Rcvr1 Received 'Hello 10'
[x] Rcvr1 Received 'Hello 11'
[x] Rcvr1 Received 'Hello 12'
[x] Rcvr1 Received 'Hello 13'
[x] Rcvr1 Received 'Hello 14'
[x] Rcvr1 Received 'Hello 15'
...

Интересно, что если я запускаю отправителя, затем запускаю получателя, как описано выше, затем снова запускаю отправителя (пока получатель обрабатывает первую партию).Первые отправленные сообщения обрабатываются последовательно, в то время как второй пакет обрабатывается параллельно, или, по крайней мере, с остальными потоками.: * 10101 *

 [*] Rcvr1 Waiting for messages...
 [*] Rcvr2 Waiting for messages...
 [x] Rcvr1 Received '[Batch 1] Hello 0'
 [x] Rcvr1 Received '[Batch 1] Hello 1'
 [x] Rcvr1 Received '[Batch 1] Hello 2'
 [x] Rcvr1 Received '[Batch 1] Hello 3'
 [x] Rcvr1 Received '[Batch 1] Hello 4'
 [x] Rcvr1 Received '[Batch 1] Hello 5'
 [x] Rcvr1 Received '[Batch 1] Hello 6'
 [x] Rcvr1 Received '[Batch 1] Hello 7'
 [x] Rcvr1 Received '[Batch 1] Hello 8'
 [x] Rcvr2 Received '[Batch 2] Hello 1'
 [x] Rcvr1 Received '[Batch 1] Hello 9'
 [x] Rcvr2 Received '[Batch 2] Hello 3'
 [x] Rcvr1 Received '[Batch 1] Hello 10'
 [x] Rcvr2 Received '[Batch 2] Hello 5'
 [x] Rcvr1 Received '[Batch 1] Hello 11'
 [x] Rcvr2 Received '[Batch 2] Hello 7'
 [x] Rcvr1 Received '[Batch 1] Hello 12'
 [x] Rcvr2 Received '[Batch 2] Hello 9'
 [x] Rcvr1 Received '[Batch 1] Hello 13'
 [x] Rcvr2 Received '[Batch 2] Hello 11'

Это очевидно возможно с RabbitMQ, я не уверен, что яделаю неправильно.Мой простой код выглядит следующим образом:

Отправитель

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            for(int x=0; x<100; x++) {
                String message = "Hello "+x;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

Получатель

package com.mawv.ingest.rabbitmq;

import com.rabbitmq.client.*;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ThreadPoolExecutor rcvrPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Runnable rcvr1 = () -> {
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                System.out.println(" [*] Rcvr1 Waiting for messages...");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    Envelope envelope = delivery.getEnvelope();
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Rcvr1 Received '" + message + "'");
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, true);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ex) { }

                };
                channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {  });

            } catch(Exception ex){
                ex.printStackTrace();
            }
        };
        Runnable rcvr2 = () -> {
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                System.out.println(" [*] Rcvr2 Waiting for messages...");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    Envelope envelope = delivery.getEnvelope();
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Rcvr2 Received '" + message + "'");
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, true);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception ex) {
                    }
                };
                channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
                });
            } catch(Exception ex){
                ex.printStackTrace();
            }
        };
        rcvrPool.execute(rcvr1);
        rcvrPool.execute(rcvr2);

    }
}

Я также связал этот пример в точности так, как они его описывают и видят то же самоеРезультаты.https://self -learning-java-tutorial.blogspot.com / 2015/09 / rabbitmq-один-продюсер-и-несколько.html

Я предполагаю, что что-то не так с моимнастроить.

Ответы [ 2 ]

0 голосов
/ 30 декабря 2018

Похоже, мне не хватало критической конфигурации канала.Это решило мою проблему:

channel.basicQos (1);

Это то, что RabbitMQ должен сказать об этом.

Справедливая отправка

Возможно, вы заметили, что отправка по-прежнему не работает так, как мы хотим.Например, в ситуации с двумя работниками, когда все нечетные сообщения тяжелые, а четные сообщения легкие, один работник будет постоянно занят, а другой вряд ли будет выполнять какую-либо работу.Что ж, RabbitMQ ничего об этом не знает и все равно будет отправлять сообщения равномерно.

Это происходит потому, что RabbitMQ просто отправляет сообщение, когда сообщение входит в очередь.Это не смотрит на количество неподтвержденных сообщений для потребителя.Он просто слепо отправляет каждое n-е сообщение n-му потребителю.

Чтобы победить это, мы можем использовать метод basicQos с параметром prefetchCount = 1.Это говорит RabbitMQ не передавать более одного сообщения работнику за раз.Или, другими словами, не отправляйте новое сообщение работнику, пока оно не обработает и не подтвердит предыдущее.Вместо этого он отправит его следующему работнику, который еще не занят.

0 голосов
/ 30 декабря 2018

Согласно RabbitMQ api:

"Хотя канал может использоваться несколькими потоками, важно убедиться, что только один поток выполняет команду одновременно. Одновременное выполнение команд, вероятно, приведет к тому, что ошибка UnexpectedFrameError будетthrown "

Прежде всего, я думаю, что вы должны использовать разные каналы для разных потоков.

Наконец, я думаю, что первый поток завершен, потому что он свободен, поэтому активен только второй и выполняетвся работаВ этой ситуации достаточно одного потока.

Посмотрите API-интерфейс ThreadPoolExecutor для Java 8:

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html

Например, вы можете найти:

"По умолчанию даже основные потоки изначально создаются и запускаются только при поступлении новых задач, но это можно динамически переопределить с помощью метода prestartCoreThread () или prestartAllCoreThreads (). Возможно, вы захотите предварительно запустить потоки, если создаете пул с не-empty queue "

и

" Если в пуле в настоящее время имеется больше потоков corePoolSize, избыточные потоки будут прерваны, если они простаивают больше, чем keepAliveTime (см. getKeepAliveTime (TimeUnit)). "

Вы должны использовать prestartAllCoreThreads () или prestartCoreThreads (), чтобы запустить основные потоки, даже когда они простаивают, или getKeepAliveTime (TimeUnit), чтобы поддерживать их живыми, даже если они простаивают.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...