В основном, использование одного рабочего потока и ожидание его выполнения всегда будет медленнее, чем выполнение работы в локальном потоке, потому что:
- Создание потоков занимает время.
- Отправка данных между потоками занимает время.
Вы можете получить выигрыш, если у вас есть отдельные части работы, которые можно обрабатывать параллельно, и несколько ядер ЦП для работы. В этой ситуации вы можете отправлять разные части работы нескольким работникам (до столько процессорных ядер, сколько доступно), при условии, что работа не ограничена каким-либо другим единственным ресурсом, за который они все будут конкурировать.
Ниже я опубликовал программу, которая сортирует 12 массивов локально и через рабочих с повторными гонками. (При сортировке в работниках он передает данные массива работнику, а затем обратно, а не копирует их.) Он запускает работников заранее и использует их повторно, но включает время, затраченное на определение среднее время, затрачиваемое работниками на выполнение своей работы, поэтому мы включаем все накладные расходы.
На моей рабочей станции с четырьмя ядрами ЦП и предоставлением ей рабочего для каждого ядра работники легко выигрывают:
# of workers: 4
Local average: 8790.010573029518ms
Workers' average: 3550.658817946911ms
Workers win, taking 40.39425% of the time local did
Однако, если я ограничу его одним рабочим, то у него чисто накладные расходы, и локальный поток выигрывает:
# of workers: 1
Local average: 8907.022233068943ms
Workers' average: 8953.339844942093ms
Local wins, taking 99.48268% of the time workers did
Даже два рабочих выигрывают, потому что они могут работать параллельно на этом мульти -core machine:
# of workers: 2
Local average: 8782.853852927685ms
Workers' average: 4754.60275799036ms
Workers win, taking 54.13505% of the time local did
На одноядерном компьютере (если вы можете найти его больше) эти два рабочих снова окажутся лишними, и локальный поток победит.
Вот main.js
:
const os = require('os');
const { Worker } = require('worker_threads');
const { performance } = require('perf_hooks');
const MAX_UINT32 = (2**32)-1;
const ARRAY_SIZE = 100000;
const ARRAY_COUNT = 12;
const workerCount = +process.argv[2] || os.cpus().length;
const raceCount = +process.argv[3] || 5;
class WorkerQueue {
#workers;
#available;
#pending;
#checkPending = () => { // private methods still aren't unflagged in v13, so...
if (this.#available.length && this.#pending.length) {
const resolve = this.#pending.shift();
const worker = this.#available.shift();
resolve(worker);
}
};
constructor(...workers) {
this.#workers = new Set(workers);
this.#available = [...this.#workers];
this.#pending = [];
}
get() {
return new Promise(resolve => {
this.#pending.push(resolve);
this.#checkPending();
});
}
release(worker) {
if (!this.#workers.has(worker)) {
throw new Error("Uknown worker");
}
this.#available.push(worker);
this.#checkPending();
}
terminate() {
for (const worker of this.#workers) {
worker.terminate();
}
this.#workers = new Set();
this.#available = [];
this.#pending = [];
}
}
const {workers, workerCreationTime} = createWorkers();
main();
function createWorkers() {
const start = performance.now();
const workers = new WorkerQueue(
...Array.from({length: workerCount}, () => new Worker("./worker.js"))
);
const workerCreationTime = performance.now() - start;
return {workers, workerCreationTime};
}
async function main() {
try {
console.log(`Workers: ${workerCount} (in ${workerCreationTime}ms), races: ${raceCount}`);
let localAverage = 0;
let workersAverage = 0;
for (let n = 1; n <= raceCount; ++n) {
console.log(`Race #${n}:`);
const {localTime, workersTime} = await sortRace();
localAverage += localTime;
workersAverage += workersTime;
}
// Include the time it took to create the workers in the workers' average, as
// though we'd created them for each race. (We didn't because doing so would
// have given the local thread an advantage: after the first race, it's warmed
// up, but a new worker would be cold. So we let the workers be warm but add
// the full creation time into each race.
workersAverage += workerCreationTime;
console.log("----");
console.log(`# of workers: ${workerCount}`);
console.log(`Local average: ${localAverage}ms`);
console.log(`Workers' average: ${workersAverage}ms`);
if (localAverage > workersAverage) {
showWinner("Workers win", "local", workersAverage, localAverage);
} else {
showWinner("Local wins", "workers", localAverage, workersAverage);
}
workers.terminate();
} catch (e) {
console.error(e.message, e.stack);
}
}
function showWinner(msg, loser, winnerAverage, loserAverage) {
const percentage = (winnerAverage * 100) / loserAverage;
console.log(`${msg}, taking ${percentage.toFixed(5)}% of the time ${loser} did`);
}
async function sortRace() {
// Create a bunch of arrays for local to sort
const localArrays = Array.from({length: ARRAY_COUNT}, () => createRandomArray(ARRAY_SIZE));
// Copy those array so the workers are dealing with the same values
const workerArrays = localArrays.map(array => new Uint32Array(array));
const localStart = performance.now();
const localResults = await Promise.all(localArrays.map(sortLocal));
const localTime = performance.now() - localStart;
checkResults(localResults);
console.log(`Local time: ${localTime}ms`);
const workerStart = performance.now();
const workersResults = await Promise.all(workerArrays.map(sortViaWorker));
const workersTime = performance.now() - workerStart;
checkResults(workersResults);
console.log(`Workers' time: ${workersTime}ms`);
return {localTime, workersTime};
}
async function sortLocal(array) {
await Promise.resolve(); // To make it start asynchronously, like `sortViaWorker` does
array.sort((a, b) => a - b);
return array;
}
async function sortViaWorker(array) {
const worker = await workers.get();
return new Promise(resolve => {
worker.once("message", result => {
workers.release(worker);
resolve(result.array);
});
worker.postMessage({array}, [array.buffer]);
});
}
function checkResults(arrays) {
for (const array of arrays) {
const badIndex = array.findIndex((value, index) => index > 0 && array[index-1] > value);
if (badIndex !== -1) {
throw new Error(
`Error, array entry ${badIndex} has value ${array[badIndex]} ` +
`which is > previous value ${array[badIndex-1]}`
);
}
}
}
function createRandomArray(length) {
const array = new Uint32Array(Uint32Array.BYTES_PER_ELEMENT * length);
return randomFillArray(array);
}
function randomFillArray(array) {
for (let length = array.length, i = 0; i < length; ++i) {
array[i] = Math.random() * MAX_UINT32;
}
return array;
}
и worker.js
:
const { parentPort } = require("worker_threads");
parentPort.on("message", ({array}) => {
array.sort((a, b) => a - b);
parentPort.postMessage({array}, [array.buffer]);
});