Я следую этому руководству от RabbitMQ: https://www.rabbitmq.com/tutorials/tutorial-two-java.html. Я хочу эмулировать эту функцию с несколькими потоками в одной очереди.
Если я запускаю свои приемники до запуска отправителя, он работает как положено, как показано ниже:
[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received 'Hello 0'
[x] Rcvr2 Received 'Hello 1'
[x] Rcvr1 Received 'Hello 2'
[x] Rcvr2 Received 'Hello 3'
[x] Rcvr1 Received 'Hello 4'
[x] Rcvr2 Received 'Hello 5'
[x] Rcvr1 Received 'Hello 6'
[x] Rcvr2 Received 'Hello 7'
[x] Rcvr1 Received 'Hello 8'
...
Однако при запуске моих приемников сначала только один поток получает сообщения (последнийпоток должен быть запущен):
[*] Rcvr2 Waiting for messages...
[*] Rcvr1 Waiting for messages...
[x] Rcvr1 Received 'Hello 9'
[x] Rcvr1 Received 'Hello 10'
[x] Rcvr1 Received 'Hello 11'
[x] Rcvr1 Received 'Hello 12'
[x] Rcvr1 Received 'Hello 13'
[x] Rcvr1 Received 'Hello 14'
[x] Rcvr1 Received 'Hello 15'
...
Интересно, что если я запускаю отправителя, затем запускаю получателя, как описано выше, затем снова запускаю отправителя (пока получатель обрабатывает первую партию).Первые отправленные сообщения обрабатываются последовательно, в то время как второй пакет обрабатывается параллельно, или, по крайней мере, с остальными потоками.: * 10101 *
[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received '[Batch 1] Hello 0'
[x] Rcvr1 Received '[Batch 1] Hello 1'
[x] Rcvr1 Received '[Batch 1] Hello 2'
[x] Rcvr1 Received '[Batch 1] Hello 3'
[x] Rcvr1 Received '[Batch 1] Hello 4'
[x] Rcvr1 Received '[Batch 1] Hello 5'
[x] Rcvr1 Received '[Batch 1] Hello 6'
[x] Rcvr1 Received '[Batch 1] Hello 7'
[x] Rcvr1 Received '[Batch 1] Hello 8'
[x] Rcvr2 Received '[Batch 2] Hello 1'
[x] Rcvr1 Received '[Batch 1] Hello 9'
[x] Rcvr2 Received '[Batch 2] Hello 3'
[x] Rcvr1 Received '[Batch 1] Hello 10'
[x] Rcvr2 Received '[Batch 2] Hello 5'
[x] Rcvr1 Received '[Batch 1] Hello 11'
[x] Rcvr2 Received '[Batch 2] Hello 7'
[x] Rcvr1 Received '[Batch 1] Hello 12'
[x] Rcvr2 Received '[Batch 2] Hello 9'
[x] Rcvr1 Received '[Batch 1] Hello 13'
[x] Rcvr2 Received '[Batch 2] Hello 11'
Это очевидно возможно с RabbitMQ, я не уверен, что яделаю неправильно.Мой простой код выглядит следующим образом:
Отправитель
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int x=0; x<100; x++) {
String message = "Hello "+x;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
Получатель
package com.mawv.ingest.rabbitmq;
import com.rabbitmq.client.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ThreadPoolExecutor rcvrPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Runnable rcvr1 = () -> {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Rcvr1 Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
Envelope envelope = delivery.getEnvelope();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Rcvr1 Received '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, true);
try {
Thread.sleep(1000);
} catch (Exception ex) { }
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
} catch(Exception ex){
ex.printStackTrace();
}
};
Runnable rcvr2 = () -> {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Rcvr2 Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
Envelope envelope = delivery.getEnvelope();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Rcvr2 Received '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, true);
try {
Thread.sleep(1000);
} catch (Exception ex) {
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
} catch(Exception ex){
ex.printStackTrace();
}
};
rcvrPool.execute(rcvr1);
rcvrPool.execute(rcvr2);
}
}
Я также связал этот пример в точности так, как они его описывают и видят то же самоеРезультаты.https://self -learning-java-tutorial.blogspot.com / 2015/09 / rabbitmq-один-продюсер-и-несколько.html
Я предполагаю, что что-то не так с моимнастроить.