В моем приложении у меня есть 3 класса:
- Компания, которая нанимает работников для любого из 3 заданий
- Работники, каждый может выполнять 2 задания
- Администратор, который получает копии всех сообщений в программа и может отправлять сообщения всем компаниям, всем работникам или просто всем
Я использую work.companies.companyName
для ключей компаний и work.workers.workerName
для рабочих ключей, они оба используют обмен по умолчанию и очередь для связи. Администратор получает сообщения с admin
Topi c Exchange.
Проблема с администратором -> общение всех остальных. Он работает точно так же, как прямой обмен - я могу получить названия компаний и работников, даже такие как "#", "company1. #" Et c. и они ничего не получат, если только в Администраторе я явно не отправлю сообщение с ключом типа «work.companies.company1».
Я бы хотел использовать, например, «work.companies. #» для отправки сообщение для всех компаний. Что я делаю не так?
Администратор. java:
public class Administrator
{
public static void main(String[] args) throws IOException, TimeoutException
{
new Thread(new TopicListener("admin", ign -> {})).start();
TopicWriter writer = new TopicWriter();
// lots of code
TopicListener. java:
public class TopicListener implements Runnable
{
private final String EXCHANGE_NAME = "space";
private String key;
private Consumer<String> msgHandler;
public TopicListener(String key, Consumer<String> msgHandler)
{
this.key = key;
this.msgHandler = msgHandler;
}
@Override
public void run()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, key);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received: \"" + msg + "\"");
msgHandler.accept(msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
catch (IOException | TimeoutException e)
{ e.printStackTrace(); }
}
}
TopicWriter. java:
public class TopicWriter
{
private final String EXCHANGE_NAME = "space";
private final Channel channel;
public TopicWriter() throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
this.channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
}
public void send(String msg, String key) throws IOException
{
channel.basicPublish(
EXCHANGE_NAME,
key,
null,
msg.getBytes(StandardCharsets.UTF_8));
}
}
Компания. java содержит:
new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();
Рабочий. java содержит:
new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();