ZeroMQ как отодвинуть от потребителя к производителю - PullRequest
0 голосов
/ 19 апреля 2019

Я решаю упражнение, используя ZeroMQ с паттерном Push / Pull, где потребитель должен передать результат через другую трубу производителю.У меня есть следующий пример:

'use strict';
const cluster = require('cluster');
const fs = require('fs');
const zmq = require('zeromq');
const numWorkers = require('os').cpus().length;

if (cluster.isMaster) {
  // master's queue
  const pusher_m = zmq.socket('push').bind('ipc://filer-pp-m.ipc');
  // worker's queue
  const pusher_w = zmq.socket('push').bind('ipc://filer-pp-w.ipc');

  // Forward product to workers's queue
  for (let i = 0; i < 5; i++) {
    pusher_w.send(
      JSON.stringify({
        task: 'code'
      })
    );
  }

  // Event pull (master consumes master's queue)
  const puller_m = zmq.socket('pull').connect('ipc://filer-pp-m.ipc');
  puller_m.on('message', data => {
    const consumer = JSON.parse(data);
    console.log(`Worker's status: ${consumer.status}`);
  });

  // setup: initialize workers

  cluster.on('online',
    worker => console.log(`Worker ${worker.process.pid} is online.`));

  for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
  }

} else {

  // Event pull (worker consumes worker's queue)
  const puller_w = zmq.socket('pull').connect('ipc://filer-pp-w.ipc');
  puller_w.on('message', data => {
    const consumer = JSON.parse(data);
    console.log(`Worker's task is ${consumer.task}`);
  });

  // Forward result to master's queue)
  const pusher_m = zmq.socket('push').connect('ipc://filer-pp-m.ipc');
  pusher_m.send(
    JSON.stringify({
      status: 'ready'
    })
  );
}

Мне все еще нужно добавить больше кода, чтобы он работал / течет, как ожидалось.Но сейчас я пытаюсь решить проблему передачи результата от рабочего (дочерний процесс) к производителю (главный процесс), как вы можете видеть на выходе.

// output
/*
Worker 4341 is online.
Worker 4347 is online.
Worker 4346 is online.
Worker 4353 is online.
Worker's task is code
Worker's task is code
Worker's task is code
Worker's task is code
Worker's task is code
*/

Я попытался, для целей тестирования, связать сокет работника внутри условного блока Работника (например, еще), и кажется, что производитель (мастер) смог извлечь одно сообщение из очереди мастера.Но я считаю, что это не правильно, так как мастер, кажется, является стабильным процессом для удержания связующих сокетов.

Может ли кто-нибудь помочь мне?Любые сомнения, просто спросите комментарии.

...