Как отделить сердцебиение от собственного потока для соединения RabbitMQ - PullRequest
0 голосов
/ 31 января 2019

У меня есть процесс, который использует RabbitMQ и NodeJS для обработки изображений.В связи с интенсивной задачей, я думаю, что у меня та же проблема, что и ссылка здесь https://github.com/squaremo/amqp.node/issues/261

Я пытаюсь выяснить, как реализовать последний комментарий по этой проблеме.

"Да. NodeJS является однопоточным, и если вы используете этот поток, чтобы делать что-то в течение длительного времени, больше ничего не произойдет. Как предполагает @michaelklishin, известное решение этой общей проблемы - использование дочернего процесса или кластера.module. "

EDIT:

I Обновлен код, приведенный ниже, с примером того, как, по моему мнению, я могу сделать это с помощью модуля amqp-connection-manager.Прямо сейчас я использую глобальную переменную для хранения фактического сообщения, чтобы иметь возможность подтверждения.Я предполагаю, что есть лучший способ сделать это.

//Used to be an example for how to keep the connection thread and the working thread separate
//in order to fix the issue of missing heartbeat intervals due to processing on the same thread

const cluster = require('cluster');
var amqp = require('amqp-connection-manager');
var config = require('./config.json');

var working_queue = "Test_Queue";

//DONT REALLY DO THIS
var rabbit_msg_data;


//******* CLUSTER SETUP *******
// This will spawn off the number of worker for this process.
if (cluster.isMaster) {
    console.log("Master "+process.pid+" is running");

    worker = cluster.fork();

    cluster.on('exit', (worker, code, signal) => {
        if(signal)
        {
            console.log("worker was killed by signal: "+signal);
            console.log(worker);
        }
        else if (code !== 0)
        {
            console.log("worker exited with error code: "+code);
            console.log(worker);
        }
        else
        {
            console.log("Worker "+worker.process.pid+" exited successfully");
            console.log(worker);
            //Not sure if this works this way or if I need to put this worker into variables
        }
    });

    //testing sending a message back and forth
    // setTimeout(function() {
    //  worker.send("I got a request!");
    // }, 1000);

    //******** RABBIT MQ CONNECTION **********

    // Create a connection manager to rabbitmq
    var connection = amqp.connect(config.rabbit_connections_arr, {json: true, heartbeatIntervalInSeconds: 2});
    connection.on('connect', function() {
        console.log('Connected to rabbitmq');
    });
    connection.on('disconnect', function(params) {
        console.log('Disconnected from rabbitmq:', params.err.stack);
    });

    // Set up a channel listening for messages in the queue.
    var channelWrapper_listening = connection.createChannel({
        setup: function(channel) {
            // `channel` here is a regular amqplib `ConfirmChannel`.
            return Promise.all([
                channel.assertQueue(working_queue, {durable: true}),
                channel.prefetch(1),
                channel.consume(working_queue, function(data){
                    rabbit_msg_data = data;
                    worker.send(data.content.toString());             
                }, requeue = false)
            ]);
        }
    });


    worker.on('message', function(msg){
        // console.log("Worker to Master (ack): ", msg.content.toString());
        console.log("Worker to Master (ack): ", msg);
        //console.log(msg.content.toString());
        channelWrapper_listening.ack(rabbit_msg_data);
    });



}
else //All worker processes (MAIN LOGIC)
{
    console.log("Worker "+process.pid+" started");

    process.on('message',function(msg){
        console.log("Master to Worker (working): ", msg);

        //send msg back when done working on it.
        setTimeout(function() {
            process.send(msg);
        }, 5000);
    });
}
...