RabbitMQ, использующий Direct Exchange, когда было указано Topi c - PullRequest
0 голосов
/ 26 марта 2020

В моем приложении у меня есть 3 класса:
- Компания, которая нанимает работников для любого из 3 заданий
- Работники, каждый может выполнять 2 задания
- Администратор, который получает копии всех сообщений в программа и может отправлять сообщения всем компаниям, всем работникам или просто всем

Я использую work.companies.companyName для ключей компаний и work.workers.workerName для рабочих ключей, они оба используют обмен по умолчанию и очередь для связи. Администратор получает сообщения с admin Topi c Exchange.

Проблема с администратором -> общение всех остальных. Он работает точно так же, как прямой обмен - я могу получить названия компаний и работников, даже такие как "#", "company1. #" Et c. и они ничего не получат, если только в Администраторе я явно не отправлю сообщение с ключом типа «work.companies.company1».
Я бы хотел использовать, например, «work.companies. #» для отправки сообщение для всех компаний. Что я делаю не так?

Администратор. java:

public class Administrator
{
    public static void main(String[] args) throws IOException, TimeoutException
    {
        new Thread(new TopicListener("admin", ign -> {})).start();
        TopicWriter writer = new TopicWriter();
    // lots of code

TopicListener. java:

public class TopicListener implements Runnable
{
    private final String EXCHANGE_NAME = "space";
    private String key;
    private Consumer<String> msgHandler;

    public TopicListener(String key, Consumer<String> msgHandler)
    {
        this.key = key;
        this.msgHandler = msgHandler;
    }

    @Override
    public void run()
    {
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, key);

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                {
                    String msg = new String(body, StandardCharsets.UTF_8);
                    System.out.println("Received: \"" + msg + "\"");
                    msgHandler.accept(msg);
                }
            };

            channel.basicConsume(queueName, true, consumer);
        }
        catch (IOException | TimeoutException e)
        { e.printStackTrace(); }
    }
}

TopicWriter. java:

public class TopicWriter
{
    private final String EXCHANGE_NAME = "space";
    private final Channel channel;

    public TopicWriter() throws IOException, TimeoutException
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    }

    public void send(String msg, String key) throws IOException
    {
        channel.basicPublish(
                EXCHANGE_NAME,
                key,
                null,
                msg.getBytes(StandardCharsets.UTF_8));
    }
}

Компания. java содержит:

new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();

Рабочий. java содержит:

new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();

1 Ответ

0 голосов
/ 28 марта 2020

Я выяснил, в чем проблема: я пытался отправить сообщение всем, кто использует Topi c, где в RabbitMQ Topi c используется для указания, кто должен получить сообщение. «#» Или «*» должны использоваться в объявлении ключа очереди, а не при отправке сообщения с данным ключом.

...