Связь ZeroMQ PUSH / PULL не работает через IPC, но работает через TCP - PullRequest
1 голос
/ 11 июля 2019

Это было задание из книги под названием " Node.js 8 Правильный путь ". Вы можете увидеть это ниже:

https://pp.userapi.com/c850628/v850628437/164aec/sFdn3icXb7Y.jpg

https://pp.userapi.com/c850628/v850628437/164ae4/4x02I_it_v8.jpg

Это мое решение:

'use strict';
const zmq = require('zeromq');
const cluster = require('cluster');

const push = zmq.socket('push');
const pull = zmq.socket('pull');

const cores_num = require('os').cpus().length;
let workers_num = 0;

push.bind('tcp://127.0.0.1:9998');
pull.bind('tcp://127.0.0.1:9999');
// push.bind('ipc://push.ipc');
// pull.bind('ipc://pull.ipc');

if (cluster.isMaster) {
  for (let j = 0; j < cores_num; j++) {
    cluster.fork();
  }

  cluster.on('online', (worker) => {
    console.log(`Worker with pid ${worker.process.pid} is created!`);
  });

  pull.on('message', (data) => {
    const response = JSON.parse(data.toString());

    if (response.type === 'ready') {
      if (workers_num >= 0 && workers_num < 3) {
        workers_num++;

        if (workers_num == 3) {
          console.log('Ready!');

          for (let i = 0; i < 10; i++) {
            push.send(JSON.stringify({
              type: 'job',
              data: `This message has id ${i}`
            }));
          }
        }
      }
    } else if (response.type === 'result') {
      console.log(data.toString());
    }
  });
} else {
  const worker_push = zmq.socket('push');
  const worker_pull = zmq.socket('pull');

  worker_pull.connect('tcp://127.0.0.1:9998');
  worker_push.connect('tcp://127.0.0.1:9999');
  // worker_pull.connect('ipc://push.ipc');
  // worker_push.connect('ipc://pull.ipc');

  worker_push.send(JSON.stringify({
    type: 'ready'
  }));

  worker_pull.on('message', data => {
    const request = JSON.parse(data);

    if (request.type === 'job') {
      console.log(`Process ${process.pid} got message ${request.data}`);
      worker_push.send(JSON.stringify({
        type: 'result',
        data: `This message is a response from process ${process.pid}`,
        time: Date.now()
      }));
    }
  });
}

Как видите, он работает только тогда, когда PUSH / PULL-сокеты и рабочие общаются через TCP, но я хочу знать причину, по которой он не работает через IPC.

Обновление (см. Условие 4 ниже «путь должен быть доступным для записи»): enter image description here Я надеюсь, что вы поможете мне найти проблему.

Ответы [ 2 ]

0 голосов
/ 11 июля 2019

Мало вещей:

Ваш IPC-путь неверен:

У вас есть ipc://push.ipc (2 слеша), которые вам действительно нужны ipc:///push.ipc Протокол ipc://, тогда вам нужен путь к файлу /push.ipc

Права доступа к файлу:

Имеет ли ваш процесс разрешение на запись в корневой каталог? Если вы не являетесь пользователем root, я бы не подумал.

Я бы изменил путь на что-то вроде /tmp/push.ipc, которое в большинстве систем доступно для записи всем пользователям.

В этом случае ваш URL должен быть:

ipc:///tmp/push.ipc

ветвление

Я вообще не использую ноды, но, основываясь на своих знаниях о разветвлении других языков, я думаю, что вся программа снова запускается в другом процессе / потоке.

В этом случае не каждый работник снова пытается bind(), поскольку код создания / связывания сокета находится за пределами if (cluster.isMaster) {

Это должно выглядеть так, я думаю

if (cluster.isMaster) {

  const push = zmq.socket('push');
  const pull = zmq.socket('pull');
  push.bind('ipc://push.ipc');
  pull.bind('ipc://pull.ipc');
  ....
}
0 голосов
/ 11 июля 2019

Я хочу знать причину, по которой он не работает через IPC.

Существует несколько условий использования ipc:// транспортного класса дляиспользуя масштабируемые формальные архетипы ZeroMQ и получаем .bind()/.connect() -ed

1) Межпроцессный транспорт передает сообщения между локальными процессами, используя системно-зависимый механизм IPC.Межпроцессный транспорт в настоящее время реализован только в операционных системах, которые предоставляют доменные сокеты UNIX.

2) Как со стороны .bind(), так и со стороны .connect() необходимо соответствовать нанекоторый выполнимый адрес :

          push.bind(    'ipc://push.sock'     ); // will never meet its counterparty
 // ------------------(--|||://^v^v^v^v^v^v^v )
   worker_pull.connect( 'ipc:///tmp/push.sock'); //         if used other ipc://-address

3) Если какой-либо второй процесс связывается с конечной точкой, уже связанной процессом, это будет успешно выполнено, и первый процесс потеряет свою привязку.В этом случае транспортный класс ipc:// не согласуется с транспортными классами tcp:// или inproc://.

4) Путь к адресу конечной точки должен быть доступным для записипроцесс.Когда конечная точка начинается с /, например, ipc: /// pathname, это будет абсолютный путь.Если конечная точка указывает каталог, который не существует, привязка должна завершиться неудачей.

5) Когда имя пути конечной точки начинается с @, должно использоваться абстрактное пространство имен.Абстрактное пространство имен не зависит от файловой системы, и если процесс попытается связать конечную точку, уже связанную с процессом, произойдет сбой.Подробности смотрите в unix (7).

6) ipc:// Путь к транспортному классу имеет максимальный размер, который зависит от операционной системы.В Linux максимум составляет 113 символов, включая префикс "ipc://" (107 символов для реального имени пути).

7) Когда в zmq_bind() использовалась спецификация конечной точки с подстановочным знаком *,вызывающая сторона должна использовать реальную конечную точку, полученную из опции сокета ZMQ_LAST_ENDPOINT, чтобы отсоединить эту конечную точку от сокета с помощью zmq_unbind().

...