Координация параллельного выполнения в node.js - PullRequest
79 голосов
/ 08 января 2011

Модель событий, основанная на событиях, в node.js несколько усложняет координацию потока программы.

Простое последовательное выполнение превращается во вложенные обратные вызовы, что достаточно просто (хотя и немного запутанно, чтобы записать).

Но как насчет параллельного выполнения?Допустим, у вас есть три задачи A, B, C, которые могут выполняться параллельно, и когда они будут выполнены, вы хотите отправить их результаты в задачу D.

Для модели с разветвлением / объединением это будет

  • вилка A
  • вилка B
  • вилка C
  • соединение A, B, C, запуск D

Как мненаписать что в node.js?Есть ли лучшие практики или кулинарные книги?Нужно ли вручную катить решение каждый раз, или есть какая-то библиотека с помощниками для этого?

Ответы [ 7 ]

126 голосов
/ 08 января 2011

В node.js нет ничего параллельного, так как он однопоточный.Однако несколько событий можно запланировать и запустить в последовательности, которую вы не можете определить заранее.А некоторые вещи, такие как доступ к базе данных, на самом деле «параллельны» в том смысле, что сами запросы к базе данных выполняются в отдельных потоках, но после завершения реинтегрируются в поток событий.

Итак, как запланировать обратный вызов для несколькихобработчики событий?Что ж, это один из распространенных методов, используемых в анимациях в браузере на стороне javascript: используйте переменную для отслеживания завершения.

Это звучит как хак, и это звучит потенциально беспорядочно, оставляя кучу глобальных переменных вокругделать отслеживание и на меньшем языке это было бы.Но в javascript мы можем использовать замыкания:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);

В приведенном выше примере мы сохраняем код простым, предполагая, что функции async и callback не требуют аргументов.Конечно, вы можете изменить код для передачи аргументов асинхронным функциям и заставить функцию обратного вызова накапливать результаты и передавать ее в функцию shared_callback.


Дополнительный ответ:

На самом деле, дажекак таковая, функция fork() уже может передавать аргументы асинхронным функциям с помощью замыкания:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

остается только накопить результаты из A, B, C и передать ихк Д.


Еще более дополнительный ответ:

Я не удержался.Думал об этом во время завтрака.Вот реализация fork(), которая накапливает результаты (обычно передаваемые в качестве аргументов функции обратного вызова):

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

Это было достаточно просто.Это делает fork() достаточно универсальным и может использоваться для синхронизации нескольких неоднородных событий.

Пример использования в Node.js:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

Обновление

Этот код был написан до существования таких библиотек, как async.js или различных библиотек, основанных на обещаниях.Я хотел бы верить, что async.js был вдохновлен этим, но у меня нет никаких доказательств этого.Во всяком случае ... если вы думаете сделать это сегодня, посмотрите async.js или обещания.Просто рассмотрите приведенный выше ответ как хорошее объяснение / иллюстрацию того, как работают такие вещи, как async.parallel.

Для полноты картины, вот как вы это сделаете с async.parallel:

var async = require('async');

async.parallel([A,B,C],D);

Обратите внимание, что async.parallel работает точно так же, как и функция fork, которую мы реализовали выше.Основное отличие состоит в том, что он передает ошибку в качестве первого аргумента D и обратный вызов в качестве второго аргумента согласно соглашению node.js.

Используя обещания, мы напишем его следующим образом:

// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);
10 голосов
/ 24 сентября 2011

Я полагаю, что теперь модуль "async" обеспечивает эту параллельную функциональность и примерно такой же, как и функция fork выше.

5 голосов
/ 20 февраля 2012

В модуле futures есть подмодуль под названием join , который мне нравилось использовать:

Объединяет асинхронные вызовы вместе, подобно тому, как pthread_join работает для потоков.

В файле readme приведены некоторые хорошие примеры использования его вольным стилем или использования подмодуля future с использованием шаблона Promise. Пример из документов:

var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);
2 голосов
/ 30 апреля 2011

Другим вариантом может быть модуль Step для узла: https://github.com/creationix/step

2 голосов
/ 08 января 2011

Здесь может быть возможно простое решение: http://howtonode.org/control-flow-part-ii прокрутите до Параллельные действия. Другой способ состоит в том, чтобы все A, B и C совместно использовали одну и ту же функцию обратного вызова, чтобы эта функция имела глобальный или, по крайней мере, инкрементный для функции инкрементор, если все три вызвали обратный вызов, то пусть он выполняет D Конечно, вам также нужно где-то хранить результаты A, B и C.

0 голосов
/ 19 октября 2015

В дополнение к популярным обещаниям и асинхронной библиотеке, существует 3 элегантный способ - использование «проводки»:

var l = new Wire();

funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));

l.success(function(results) {
   // result will be object with results:
   // { post: ..., comments: ..., links: ...}
});

https://github.com/garmoshka-mo/mo-wire

0 голосов
/ 22 декабря 2014

Вы можете попробовать эту крошечную библиотеку: https://www.npmjs.com/package/parallel-io

...