Узел Js / Typescript - Потребитель AMQP - PullRequest
0 голосов
/ 29 августа 2018

Я впервые пробую свои силы в файле node.js / typescript, и у меня возникли некоторые проблемы с поиском очереди для кроликов.

Код:

let amqp = require('amqp');

let connection = amqp.createConnection({url: "amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASSWORD + "@" + RABBITMQ_HOST + ":" + RABBITMQ_PORT + RABBITMQ_VHOST});

connection.on('ready', function() {
    connection.exchange(RABBITMQ_WORKER_EXCHANGE, function (exchange) {
        connection.queue(RABBITMQ_QUEUE, function (queue) {
            queue.bind(exchange, function() {
                queue.publish(function (message) {
                    console.log('subscribed to queue');
                    let encoded_payload = unescape(message.data);
                    let payload = JSON.parse(encoded_payload);
                    console.log('Received a message:');
                    console.log(payload);
                })
            })
        })
    })
})

Кажется, что он подключается к серверу amqp и не выдает ошибок, но он просто сидит и ничего не потребляет. Есть ли шаг, который я пропускаю?

Любая помощь будет принята с благодарностью, Спасибо.

1 Ответ

0 голосов
/ 29 августа 2018

Вот мое решение, которое работает на основе учебника по JS от amqp. https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

Возможно, не в соответствии со стандартами TypeScript, не стесняйтесь поправлять меня, если есть лучший способ.

#!/usr/bin/env node

require('dotenv').config();
import amqp = require('amqplib/callback_api');
import db = require('./database');

amqp.connect({
    protocol: process.env.RABBITMQ_PROTOCOL,
    hostname: process.env.RABBITMQ_HOST,
    port: process.env.RABBITMQ_PORT,
    username: process.env.RABBITMQ_USER,
    password: process.env.RABBITMQ_PASSWORD,
    vhost: process.env.RABBITMQ_VHOST
}, function(err, conn) {
    conn.createChannel(function (err, ch) {
        // set exchange that is being used
        ch.assertExchange(process.env.RABBITMQ_WORKER_EXCHANGE, 'direct', {durable: true});
        // set queue that is being used
        ch.assertQueue(process.env.RABBITMQ_QUEUE, {durable: true}, function (err, q) {
            console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
            // bind the queue to the exchange
            ch.bindQueue(q.queue, process.env.RABBITMQ_WORKER_EXCHANGE, '');
            // consume from the queue, one message at a time.
            ch.consume(q.queue, function (msg) {
                console.log("Message received: %s", msg.content.toString());
                //save message to db
                db.store(msg.content.toString()).then(function() {
                    //acknowledge receipt of message to amqp
                    console.log("Acknowledging message");
                    ch.ack(msg, true);
                });
            }, {noAck: false});
        });
    });
});
...