NodeJS | Cluster: Как отправить данные от мастера всем или одному ребенку / работникам? - PullRequest
23 голосов
/ 16 декабря 2011

У меня есть рабочий (фондовый) скрипт из узла

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

В приведенном выше сценарии я могу с легкостью отправлять данные от работника к основному процессу. Но как отправить данные от мастера к работнику / работникам? С примерами, если это возможно.

Ответы [ 5 ]

39 голосов
/ 16 декабря 2011

Поскольку cluster.fork реализован поверх child_process.fork , вы можете отправлять сообщения от мастера к работнику, используя worker.send({ msg: 'test' }), а от работника к мастеру - process.send({ msg: 'test' });. Вы получаете такие сообщения: worker.on('message', callback) (от работника к хозяину) и process.on('message', callback); (от мастера к работнику).

Вот мой полный пример, вы можете проверить его, просмотрев http://localhost:8000/ Затем рабочий отправит сообщение мастеру, и мастер ответит:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}
8 голосов
/ 12 июня 2012

Я нашел эту тему, когда искал способ отправить сообщение всем дочерним процессам, и, к счастью, смог выяснить это благодаря комментариям о массивах.Просто хотел проиллюстрировать потенциальное решение для отправки сообщения всем дочерним процессам, использующим этот подход.

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}
3 голосов
/ 02 июля 2015

Вот как я реализовал решение аналогичной проблемы. Подключившись к cluster.on('fork'), вы можете прикреплять обработчики сообщений к работникам по мере их разветвления (вместо того, чтобы хранить их в массиве), что дает дополнительное преимущество в случаях, когда работники умирают или отключаются, а новый работник разветвляется.

Этот фрагмент отправит сообщение от мастера всем работникам.

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}
1 голос
/ 19 августа 2017

Я понимаю вашу цель широковещательной передачи всем рабочим процессам узла в кластере, хотя вы не можете отправить компонент сокета как таковой, но есть обходной путь для цели, которая будет обслуживаться. Я попробую объяснить на примере:

Шаг 1: когда для действия клиента требуется широковещательная рассылка:

Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
}) 

Шаг 2: На стороне создания кластера

Server.js (Place where cluster forking happens):

if (cluster.isMaster) {

  for (var i = 0; i < numCPUs; i++) {

    var worker = cluster.fork();

    worker.on('message', function (data) {
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  }

  cluster.on('exit', function (worker, code, signal) {
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) {
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  });
} 
else {
  //Node Js App Entry
  require("./Child.js");
}

Шаг 3: Вещание в дочернем процессе -

-> Поместите это перед io.on ("connection") в Child.js

process.on("message", function(data){
    if(data.cmd === "BROADCAST_TO_WORKER"){
        io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
    }
});

Надеюсь, это поможет. Пожалуйста, дайте мне знать, если требуются дополнительные разъяснения.

1 голос
/ 16 декабря 2011

Вы должны иметь возможность отправить сообщение от мастера к работнику, как это:

worker.send({message:'hello'})

потому что "cluster.fork реализован поверх child_process.fork" (cluster.fork реализован поверх child_process.fork)

...