Взаимодействие с Джанго / Сельдерей с Явы - PullRequest
13 голосов
/ 04 августа 2011

Наша компания имеет веб-сайт на основе Python и несколько рабочих узлов на основе Python, которые взаимодействуют через Django / Celery и RabbitMQ. У меня есть приложение на основе Java, которое должно отправлять задачи работникам из Celery. Я могу отправлять задания в RabbitMQ из Java просто отлично, но работники из Celery никогда не забирают их. Глядя на захваты пакетов обоих типов представлений о работе, есть различия, но я не могу понять, как их учесть, потому что большая их часть является двоичной, и я не могу найти документацию по декодированию. Есть ли у кого-нибудь здесь какие-либо ссылки или опыт совместной работы Java / RabbitMQ и Celery?

1 Ответ

12 голосов
/ 05 августа 2011

Я нашел решение. Библиотека Java для RabbitMQ относится к биржам / очередям / маршрутным ключам. В Celery имя очереди фактически соответствует обмену, указанному в библиотеке Java. По умолчанию очередь для сельдерея просто «сельдерей». Если ваши настройки Django определяют очередь с именем «myqueue», используя следующий синтаксис:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

Тогда код на основе Java должен сделать что-то вроде следующего:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

Оказывается, это просто различие в терминологии.

...