Возможно ли, чтобы Java прослушивала несколько очередей RabbitMQ? - PullRequest
1 голос
/ 04 марта 2012

Обновление: я думал, что мое объяснение может быть слишком многословным, поэтому я думаю, что простой способ думать о том, что я пытаюсь сделать, это: у меня есть несколько рабочих узлов, которые я хочу использовать их для разных задач (и тех разных задачприходят из разных очередей).В настоящее время я знаю только, как заставить их слушать одну очередь, связанную с одним типом работы, но я хочу, чтобы они слушали разные очереди, так как разная работа возникает, один и тот же кластер узлов может их обрабатывать.Надеюсь, это станет яснее.


Привет Всем, я подозреваю, что это возможно, но я не могу понять, как именно это сделать.Я просмотрел руководства на сайте rabbitmq, и они были действительно полезны и сделали то, что я хотел, за исключением того, что они не показали, как прослушивать несколько очередей в одной и той же программе.

Структура моей программы в основном состоит из нескольких этапов.. Например, фаза 1 собирает много данных, а затем фаза 2 обрабатывает их и загружает их в базу данных, фаза 3 анализирует их на основе других данных и т. д. Каждая фаза не может начаться, пока не будет завершена предыдущая фаза, и я хотел использоватьсистема очереди для использования нескольких машин для более быстрого завершения каждого этапа (поэтому все потребители работают на этапе 1, затем, как только все они будут выполнены, они вместе переходят на этап 2 и т. д.).

Я думаю, что не могупросто делайте каждую фазу один раз, потому что очередь может быть пустой, и компьютер переместится в следующую очередь, и я не могу знать, пуста ли она, потому что вся работа выполнена, или если она выполнена, потому что мы не начали помещать работу вОчередь еще.Поэтому я подумал (поправьте меня, если я ошибаюсь), лучше было бы прослушать все очереди, связанные со всеми фазами, и, когда работа помещается в phase1Queue, она работает над ней, а если работа помещается в phase2Queue, она сразу же работает с ней (У меня есть другие процессы вне процесса, который я описываю, который отслеживает, когда каждая фаза завершена, и настраивает для следующей фазы).Надеюсь, что это имеет смысл.

Код в образце очереди полезен для потребителя, слушающего одну очередь, но как я могу заставить его прослушивать несколько (и вызывать разные программы в зависимости от разных очередей).Если для этого уже есть функция, то это круто, но я как бы ищу логику, которую я могу использовать для ее реализации в Java (в худшем случае я думал о запуске 5 отдельных программ, прослушивающих каждую очередь, но я пытаюсь выяснить,если есть лучший способ, наличие одного приложения со всей моей работой упростит управление распространением).

Спасибо!

ps, если это поможет, вот код потребителя, который работает для rabbitmq (но, как вы можете видеть, он определяет только одну очередь):

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

Обновление: согласно комментарию, процесс координатора очень прост.Это просто программа, которая отслеживает из Phase1Queue, и когда эта очередь пуста, она просто запускает процесс заполнения Phase2Queue и т. Д.

1 Ответ

1 голос
/ 05 марта 2012

Другой подход состоит в том, чтобы разделить разные этапы между различными модулями / службами / процессами и одновременно запустить их все.

Потребитель каждой фазы слушает свою очередь.Каждый этап отвечает за создание сообщений для очереди следующего этапа.

Таким образом вы уменьшили сложность, выполняя отдельные обязанности.Каждый процесс потребляет из определенной очереди и производит для другой очереди.Затем, при необходимости, вы можете масштабировать эти отдельные процессы вверх и / или из них.

Это шаблон, который мы используем.У нас есть начальная работа, из которой в конечном итоге создается до 100 раз больше дополнительных рабочих мест.Начальные задания очень малы и быстро выполняются, в то время как подзадачи - это потенциально долго выполняющиеся запросы, которые мы обслуживаем небольшой группой облачных экземпляров.Эти задания затем возвращают свои результаты через другую очередь, результаты которой затем сопоставляются и добавляются в базу данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...