Принудительная синхронизация Node.js IPC - PullRequest
8 голосов
/ 17 октября 2019

У меня есть Node-сервер, который создает дочерний процесс с fork() с использованием IPC. В какой-то момент ребенок отправляет результаты родителю на частоте около 10 Гц как часть длительной задачи. Когда полезная нагрузка, переданная в process.send(), мала, все работает хорошо: каждое отправляемое мной сообщение немедленно принимается и обрабатывается родителем.

Однако, когда полезная нагрузка «большая» - я не определилограничение точного размера - вместо того, чтобы быть немедленно полученным родителем, все полезные данные сначала отправляются, и только после того, как дочерний элемент выполняет свою долгосрочную задачу, родитель получает и обрабатывает сообщения.

tl; dr visual:

Хорошо (происходит с малой полезной нагрузкой):

child:  send()
parent: receive()
child:  send()
parent: receive()
child:  send()
parent: receive()
...

Плохо (происходит с большой полезной нагрузкой):

child:  send()
child:  send()
child:  send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
  1. Это ошибка? (Изменить: поведение происходит только в OS X, а не в Windows или Linux)
  2. Есть ли способ избежать этого, кроме попыток сохранить малую полезную нагрузку IPC?

Редактировать 2 : в приведенном ниже примере кода используется время и счетчик итераций для выбора времени отправки обновления. (В моем реальном коде также возможно отправить обновление после n итераций или после того, как цикл достигнет определенных результатов.) Как таковой, переписать код, чтобы использовать setInterval / setTimeout вместоцикл - это последнее средство для меня, так как он требует от меня удаления функций.

Редактировать : Вот тестовый код, который воспроизводит проблему. Однако он воспроизводится только в OS X, а не в Windows или Linux:

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);
   if (match) {
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

if (process.send) process.on('message', msg => run(msg));

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Примерно около 8к проблема возникает. Например, при запросе http://localhost:8080/15 против http://localhost:8080/123456

/15
worker: send()  > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send()  > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send()  > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send()  > 123456 bytes 1571324276973
worker: send()  > 123456 bytes 1571324277174
worker: send()  > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393

Опыт работы на узлах v12.7 и v12.12.

Ответы [ 4 ]

3 голосов
/ 17 октября 2019

l Наличие продолжительного и блокирующего цикла while в сочетании с сокетами или файловыми дескрипторами в узле всегда свидетельствует о том, что что-то сделано неправильно.

Без возможности проверить всю настройку трудно сказатьесли моя заявка действительно верна, но короткие сообщения, вероятно, могут быть переданы непосредственно в один блок операционной системе, которая затем передает ее другому процессу. При более крупных сообщениях узел должен будет ждать, пока ОС сможет получить больше данных, поэтому отправка ставится в очередь, и, поскольку у вас есть блокировка while, отправка остается в очереди до тех пор, пока loop не закончится.

Такна ваш вопрос, это не ошибка.

Поскольку вы используете недавнюю версию nodejs, я бы использовал await и async вместо и создал бы неблокирующую while, аналогичную sleep в этот ответ . await позволит перехватить цикл события узла, если processSome вернет ожидающее обещание.

Для вашего кода, который на самом деле не отражает реальный вариант использования, трудно сказать, как правильно его решить. Если вы не выполните в processSome ничего асинхронного, что позволило бы перехватить ввод-вывод, вам нужно делать это вручную на регулярной основе, например, await new Promise(setImmediate);.

async function run() {
  let interval = setInterval(() => {
    process.send({action:'update', data:status()});
    console.log('child:  send()');
  }, 1/10)

  while(keepGoing()) {
    await processSome();
  }

  clearInterval(interval)
}
2 голосов
/ 26 октября 2019

По поводу вашего первого вопроса

Это ошибка? (Изменить: поведение происходит только в OS X, а не в Windows или Linux)

Это определенно не ошибка, и я мог бы воспроизвести ее на моем Windows 10 (для размера 123456). Это происходит главным образом из-за основной буферизации ядра и переключения контекста ОС, поскольку два отдельных процесса (не отсоединенных) взаимодействуют через дескриптор ipc.

Относительно вашего второго вопроса

Есть лиМожно ли как-нибудь избежать этого, кроме попыток сохранить малую полезную нагрузку IPC?

Если я правильно понимаю проблему, вы пытаетесь решить для каждого http-запроса каждый раз, когда работник отправляетчанк обратно на сервер, вы хотите, чтобы сервер обработал его, прежде чем вы получите следующий чанк. Вот как я понимаю, когда вы сказали, что обработка синхронизации

Есть способ использовать обещания, но я хотел бы использовать генераторы на рабочих местах. Лучше организовать поток между сервером и работником

Поток:

  1. Сервер отправляет целое число работнику независимо от того, что он получает из http-запроса
  2. Worker затем создает и запускает генератор для отправки первого чанка
  3. Урожай работника после отправки чанка
  4. Запросы сервера на большее
  5. Работник генерирует больше, так как сервер запрашивает больше (* только если доступно)
  6. Если не более, рабочий отправляет конец кусков
  7. Сервер просто регистрирует, что рабочий завершен, и больше не запрашивает

server.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc'], detached:false};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', (msg) => {
   //FLOW 7: Worker is done, just log
   if (msg.action == 'end'){
      console.log(`child ended for a particular request`)
   } else {
      console.log(`parent: receive(${msg.data.iter}) ${msg.data.msg.length} bytes`, Date.now())
      //FLOW 4: Server requests for more
      child.send('more')
   }   

});

require('http').createServer((req, res) => {
   console.log(req.url);
   const match = /\d+/.exec(req.url);   
   if (match) {
      //FLOW 1: Server sends integer to worker
      child.send(match[0]*1);
      res.writeHead(200, {'Content-Type':'text/plain'});
      res.end(`Sending packets of size ${match[0]}`);
   } else {
      res.writeHead(404, {'Content-Type':'text/plain'});
      res.end('what?');
   }
}).listen(8080);

worker.js

let runner
if (process.send) process.on('message', msg => {   
   //FLOW 2: Worker creates and runs a generator to send the first chunk
   if (parseInt(msg)) {
      runner = run(msg)
      runner.next()
   }
   //FLOW 5: Server asked more, so generate more chunks if available
   if (msg == "more") runner.next()

});

//generator function *
function* run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e7; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.log(`worker: send(${i})  > ${messageSize} bytes`, now);
         let j = i         
         process.send({action:'update', data:{msg, iter:j}});
         //FLOW 3: Worker yields after sending the chunk
         yield
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   //FLOW 6: If no more, worker sends end signal
   process.send({action:'end'});
   console.log('worker done');
}

Если мы знаем точный вариант использования, могут быть более эффективные способы программированияЭто. Это всего лишь один из способов синхронизации дочернего процесса, сохраняющий большую часть исходного кода.

1 голос
/ 26 октября 2019

Хотя я согласен с другими, что оптимальным решением будет то, когда дочерний процесс может добровольно отказаться от контроля в конце каждого цикла, позволяя запускать процессы очистки буфера, существует простое / быстрое / грязное исправление, которое получаету вас почти синхронное поведение, и это заставляет ребенка send вызывать блокировку.

Используйте тот же server.js, что и раньше, и почти тот же worker.js, добавив всего одну строку:

worker.js

if (process.send) process.on('message', msg => run(msg));

// cause process.send to block until the message is actually sent                                                                                
process.channel.setBlocking(true);

function run(messageSize) {
   const msg = new Array(messageSize+1).join('x');
   let lastUpdate = Date.now();
   for (let i=0; i<1e6; ++i) {
      const now = Date.now();
      if ((now-lastUpdate)>200 || i%5000==0) {
         console.error(`worker: send()  > ${messageSize} bytes`, now);
         process.send({action:'update', data:msg});
         lastUpdate = Date.now();
      }
      Math.sqrt(Math.random());
   }
   console.log('worker done');
}

Вывод:

/123456
worker: send()  > 123456 bytes 1572113820591
worker: send()  > 123456 bytes 1572113820630
parent: receive() 123456 bytes 1572113820629
parent: receive() 123456 bytes 1572113820647
worker: send()  > 123456 bytes 1572113820659
parent: receive() 123456 bytes 1572113820665
worker: send()  > 123456 bytes 1572113820668
parent: receive() 123456 bytes 1572113820678
worker: send()  > 123456 bytes 1572113820678
parent: receive() 123456 bytes 1572113820683
worker: send()  > 123456 bytes 1572113820683
parent: receive() 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820687
worker: send()  > 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820692
parent: receive() 123456 bytes 1572113820696
worker: send()  > 123456 bytes 1572113820696
parent: receive() 123456 bytes 1572113820700
worker: send()  > 123456 bytes 1572113820700
parent: receive() 123456 bytes 1572113820703
worker: send()  > 123456 bytes 1572113820703
parent: receive() 123456 bytes 1572113820706
worker: send()  > 123456 bytes 1572113820706
parent: receive() 123456 bytes 1572113820709
worker: send()  > 123456 bytes 1572113820709
parent: receive() 123456 bytes 1572113820713
worker: send()  > 123456 bytes 1572113820714
worker: send()  > 123456 bytes 1572113820721
parent: receive() 123456 bytes 1572113820722
parent: receive() 123456 bytes 1572113820725
worker: send()  > 123456 bytes 1572113820725
parent: receive() 123456 bytes 1572113820727
1 голос
/ 24 октября 2019

Если вам нужно гарантировать получение сообщения перед отправкой следующего, вы можете подождать, пока мастер подтвердит получение. Конечно, это задержит отправку следующего сообщения, но поскольку ваша логика зависит от времени и номера итерации, чтобы определить, следует ли отправлять сообщение, тогда это может подойти для вашего случая.

Реализация потребует от каждого работникасоздайте обещание для каждого отправленного сообщения и дождитесь ответа от мастера, прежде чем разрешить обещание. Это также означает, что вам нужно определить, какое сообщение подтверждено, на основе идентификатора сообщения или чего-то уникального, если у вас одновременно несколько сообщений или рабочих.

вот модифицированный код

сервер.js

const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);

child.on('message', msg =>  {
    console.log(`parent: receive() ${msg.data.length} bytes`, Date.now())
    // reply to the child with the id
    child.send({ type: 'acknowledge', id: msg.id });
});

...

worker.js

const pendingMessageResolves = {};

if (process.send) process.on('message', msg => { 
    if (msg.type === 'acknowledge') {
        // call the stored resolve function
        pendingMessageResolves[msg.id]();
        // remove the function to allow the memory to be freed
        delete pendingMessageResolves[msg.id]
    } else {
        run(msg) 
    }
});

const sendMessageAndWaitForAcknowledge = (msg) => new Promise(resolve => {
    const id = new uuid(); // or any unique field
    process.send({ action:'update', data: msg, id });
    // store a reference to the resolve function
    pendingMessageResolves[id] = resolve;
})

async function run(messageSize) {
    const msg = new Array(messageSize+1).join('x');
    let lastUpdate = Date.now();
    for (let i=0; i<1e7; ++i) {
        const now = Date.now();
        if ((now-lastUpdate)>200 || i%5000==0) {
            console.log(`worker: send()  > ${messageSize} bytes`, now);
            await sendMessageAndWaitForAcknowledge(msg); // wait until master replies
            lastUpdate = Date.now();
        }
        Math.sqrt(Math.random());
    }
    console.log('worker done');
}

ps Я не тестировал код, поэтому может потребоваться некоторая настройка, но идеядолжен держать.

...