Рабочие в javascript не так быстро - PullRequest
1 голос
/ 14 февраля 2020

Я даю попытку рабочим в js, и я попытался сделать простую сортировку, используя ту же самую функцию js. Для сравнения я использую функцию asyn c, которая сортирует 60000 случайных чисел. Первый будет сортировать случайные числа, как обычно мы привыкли делать это.

async function normalSort(arr) {
    return new Promise((res) => {
        let copy = arr;
        copy.sort((a, b) => a > b ? 1 : -1);
        return res(copy)
    })
}

другая - нормальная функция, которая будет вызываться для функции workerHandler

const { Worker, parentPort, workerData } = require('worker_threads');

function sort(data) {
    let copy = data;
    copy.sort((a, b) => a > b ? 1 : -1);
    parentPort.postMessage(copy)
    process.exit();
}


sort(workerData); 

функция обработчика рабочих

const os = require('os');
const path = require('path');
const { Worker } = require('worker_threads');

async function workersHandler(arr) {
    const startTime = Date.now();
    const cpusAmount = os.cpus().length;
    const chSize = Math.ceil(arr.length / cpusAmount)
    let promises = [];
    for (let i = 0; i < arr.length; i += chSize) {
        const end = i + chSize;
        const currentChunk = arr.slice(i, end);
        const promise = new Promise((res, rej) => {
            //@ts-ignore
            const worker = new Worker(path.join(__dirname, '..', '/utils/sort.js'), { workerData: currentChunk })

            worker.on('message', res)
            worker.on('error', rej)
        })
        promises.push(promise);
    }
    let result = await Promise.all(promises)
    return result;
}

и основная функция, которая вызовет другие функции

function main() {
    let arr = new Array(60000).fill(0).map((_, i) => Math.round(Math.random() * 100));
    const startTime = Date.now();

    workersHandler(arr).then(r => console.log('workers sort', Date.now() - startTime + ' ms'))
    normalSort(arr).then(r => console.log('normal sort', Date.now() - startTime + ' ms'))
}
main();

Удивительно, но обычная функция сортировки работает намного быстрее и работает в одном потоке. Я получаю для рабочей функции 101 мс для нормальной функции сортировки 53 мс Кто-то может объяснить мне, почему эти странные результаты ?. Работники не так быстры, или я делаю неправильную реализацию?

Ответы [ 2 ]

2 голосов
/ 14 февраля 2020

В основном, использование одного рабочего потока и ожидание его выполнения всегда будет медленнее, чем выполнение работы в локальном потоке, потому что:

  • Создание потоков занимает время.
  • Отправка данных между потоками занимает время.

Вы можете получить выигрыш, если у вас есть отдельные части работы, которые можно обрабатывать параллельно, и несколько ядер ЦП для работы. В этой ситуации вы можете отправлять разные части работы нескольким работникам (до столько процессорных ядер, сколько доступно), при условии, что работа не ограничена каким-либо другим единственным ресурсом, за который они все будут конкурировать.

Ниже я опубликовал программу, которая сортирует 12 массивов локально и через рабочих с повторными гонками. (При сортировке в работниках он передает данные массива работнику, а затем обратно, а не копирует их.) Он запускает работников заранее и использует их повторно, но включает время, затраченное на определение среднее время, затрачиваемое работниками на выполнение своей работы, поэтому мы включаем все накладные расходы.

На моей рабочей станции с четырьмя ядрами ЦП и предоставлением ей рабочего для каждого ядра работники легко выигрывают:

# of workers:     4
Local average:    8790.010573029518ms
Workers' average: 3550.658817946911ms
Workers win, taking 40.39425% of the time local did

Однако, если я ограничу его одним рабочим, то у него чисто накладные расходы, и локальный поток выигрывает:

# of workers:     1
Local average:    8907.022233068943ms
Workers' average: 8953.339844942093ms
Local wins, taking 99.48268% of the time workers did

Даже два рабочих выигрывают, потому что они могут работать параллельно на этом мульти -core machine:

# of workers:     2
Local average:    8782.853852927685ms
Workers' average: 4754.60275799036ms
Workers win, taking 54.13505% of the time local did

На одноядерном компьютере (если вы можете найти его больше) эти два рабочих снова окажутся лишними, и локальный поток победит.

Вот main.js:

const os = require('os');
const { Worker } = require('worker_threads');
const { performance } = require('perf_hooks');

const MAX_UINT32 = (2**32)-1;
const ARRAY_SIZE = 100000;
const ARRAY_COUNT = 12;
const workerCount = +process.argv[2] || os.cpus().length;
const raceCount = +process.argv[3] || 5;

class WorkerQueue {
    #workers;
    #available;
    #pending;
    #checkPending = () => { // private methods still aren't unflagged in v13, so...
        if (this.#available.length && this.#pending.length) {
            const resolve = this.#pending.shift();
            const worker = this.#available.shift();
            resolve(worker);
        }
    };

    constructor(...workers) {
        this.#workers = new Set(workers);
        this.#available = [...this.#workers];
        this.#pending = [];
    }

    get() {
        return new Promise(resolve => {
            this.#pending.push(resolve);
            this.#checkPending();
        });
    }

    release(worker) {
        if (!this.#workers.has(worker)) {
            throw new Error("Uknown worker");
        }
        this.#available.push(worker);
        this.#checkPending();
    }

    terminate() {
        for (const worker of this.#workers) {
            worker.terminate();
        }
        this.#workers = new Set();
        this.#available = [];
        this.#pending = [];
    }
}

const {workers, workerCreationTime} = createWorkers();

main();

function createWorkers() {
    const start = performance.now();
    const workers = new WorkerQueue(
        ...Array.from({length: workerCount}, () => new Worker("./worker.js"))
    );
    const workerCreationTime = performance.now() - start;
    return {workers, workerCreationTime};
}

async function main() {
    try {
        console.log(`Workers: ${workerCount} (in ${workerCreationTime}ms), races: ${raceCount}`);
        let localAverage = 0;
        let workersAverage = 0;
        for (let n = 1; n <= raceCount; ++n) {
            console.log(`Race #${n}:`);
            const {localTime, workersTime} = await sortRace();
            localAverage += localTime;
            workersAverage += workersTime;
        }
        // Include the time it took to create the workers in the workers' average, as
        // though we'd created them for each race. (We didn't because doing so would
        // have given the local thread an advantage: after the first race, it's warmed
        // up, but a new worker would be cold. So we let the workers be warm but add
        // the full creation time into each race.
        workersAverage += workerCreationTime;
        console.log("----");
        console.log(`# of workers:     ${workerCount}`);
        console.log(`Local average:    ${localAverage}ms`);
        console.log(`Workers' average: ${workersAverage}ms`);
        if (localAverage > workersAverage) {
            showWinner("Workers win", "local", workersAverage, localAverage);
        } else {
            showWinner("Local wins", "workers", localAverage, workersAverage);
        }
        workers.terminate();
    } catch (e) {
        console.error(e.message, e.stack);
    }
}

function showWinner(msg, loser, winnerAverage, loserAverage) {
    const percentage = (winnerAverage * 100) / loserAverage;
    console.log(`${msg}, taking ${percentage.toFixed(5)}% of the time ${loser} did`);
}

async function sortRace() {
    // Create a bunch of arrays for local to sort
    const localArrays = Array.from({length: ARRAY_COUNT}, () => createRandomArray(ARRAY_SIZE));
    // Copy those array so the workers are dealing with the same values
    const workerArrays = localArrays.map(array => new Uint32Array(array));

    const localStart = performance.now();
    const localResults = await Promise.all(localArrays.map(sortLocal));
    const localTime = performance.now() - localStart;
    checkResults(localResults);
    console.log(`Local time:    ${localTime}ms`);

    const workerStart = performance.now();
    const workersResults = await Promise.all(workerArrays.map(sortViaWorker));
    const workersTime = performance.now() - workerStart;
    checkResults(workersResults);
    console.log(`Workers' time: ${workersTime}ms`);

    return {localTime, workersTime};
}

async function sortLocal(array) {
    await Promise.resolve(); // To make it start asynchronously, like `sortViaWorker` does
    array.sort((a, b) => a - b);
    return array;
}

async function sortViaWorker(array) {
    const worker = await workers.get();
    return new Promise(resolve => {
        worker.once("message", result => {
            workers.release(worker);
            resolve(result.array);
        });
        worker.postMessage({array}, [array.buffer]);
    });
}

function checkResults(arrays) {
    for (const array of arrays) {
        const badIndex = array.findIndex((value, index) => index > 0 && array[index-1] > value);
        if (badIndex !== -1) {
            throw new Error(
                `Error, array entry ${badIndex} has value ${array[badIndex]} ` +
                `which is > previous value ${array[badIndex-1]}`
            );
        }
    }
}

function createRandomArray(length) {
    const array = new Uint32Array(Uint32Array.BYTES_PER_ELEMENT * length);
    return randomFillArray(array);
}

function randomFillArray(array) {
    for (let length = array.length, i = 0; i < length; ++i) {
        array[i] = Math.random() * MAX_UINT32;
    }
    return array;
}

и worker.js:

const { parentPort } = require("worker_threads");

parentPort.on("message", ({array}) => {
    array.sort((a, b) => a - b);
    parentPort.postMessage({array}, [array.buffer]);
});
0 голосов
/ 14 февраля 2020
  • 60000 может быть недостаточно, IP C раз имеет значение
  • кстати IP C: типы данных generi c JavaScript, включая массивы generi c JS, тяжелые при копировании на работников, но существуют двоичные типы массивов, https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/TypedArray
  • postMessage() имеет аргумент transfer, но он применяется только к ограниченному числу типов.

https://nodejs.org/api/worker_threads.html#worker_threads_port_postmessage_value_transferlist и https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage:

postMessage(value[, transferList])
узел : transferList может быть списком объектов ArrayBuffer и MessagePort. После передачи они больше не будут использоваться на передающей стороне канала (даже если они не содержатся в value). MDN : необязательный массив из Transferable объектов, для которых передается право собственности. Если право собственности на объект передается, оно становится непригодным (кастрированным) в контексте, из которого оно было отправлено, и становится доступным только работнику, которому оно было отправлено. Переносимые объекты - это экземпляры таких классов, как ArrayBuffer, MessagePort или ImageBitmap объекты, которые могут быть переданы.

Эффект типов:

let typ=prompt("Type: 0/1/2/3 (Array/Float64Array/Float32Array/Uint32Array)");
let len=parseInt(prompt("Length"));
let basearray;
switch(typ){
  case "1":basearray=new Float64Array(len);break;
  case "2":basearray=new Float32Array(len);break;
  case "3":basearray=new Uint32Array(len);break;
  default: basearray=new Array(len);break;
}
for(let i=0;i<basearray.length;i++)
  basearray[i]=Math.random()*0x1000000;

let cpus=4,
    chunksize=basearray.length/cpus,
    chunks=[],chunksw=[];
for(let i=0;i<cpus;i++)
  chunksw[i]=(chunks[i]=basearray.slice(i*chunksize,(i+1)*chunksize)).slice();

let start=Date.now();
for(let i=0;i<cpus;i++)
  chunks[i].sort((a,b)=>a-b);
console.log("Seq:",Date.now()-start);

let code="onmessage=event=>postMessage(event.data.sort((a,b)=>a-b));";
let ws=[],cnt=0;
for(let i=0;i<cpus;i++){
  ws[i]=new Worker("data:text/plain,"+escape(code));
  let j=i;
  ws[i].onmessage=event=>{
    chunksw[j]=event.data;
    if(++cnt===cpus){
      console.log("Par:",Date.now()-start);
      if(len<=20)
        for(let i=0;i<cpus;i++)
          console.log(chunks[i],chunksw[i]);
    }
  };
}
start=Date.now();
for(let i=0;i<cpus;i++)
  ws[i].postMessage(chunksw[i]);

Укажите длину, делимую на 4. Если длина равна 20 или меньше, результирующие отсортированные фрагменты будут также регистрироваться для целей проверки. JS Array -s надежно медленнее для меня при передаче (по сравнению с безпотоковым прогоном), независимо от того, содержат ли они 20 или 6000000 элементов (в то время как массив из 6 миллионов элементов JS работает в течение 8 секунд для мне на старом ноутбуке, все же может быть безопаснее начать с чего-то меньшего). Другие типы быстрее в потоке, Uint - самый быстрый.
На самом деле все, что не равно 1/2/3, приведет к JS Array (самому медленному), включая пустую строку.

Эффект передачи не настолько впечатляющий, но уже проявляется с самого начала (с 4 элементами это 59-69 мс против 20-22 мс на моем P C):

let typ=prompt("Type: 0/1/2 (Float64Array/Float32Array/Uint32Array)");
let len=parseInt(prompt("Length"));
let basearray;
switch(typ){
  case "1":basearray=new Float32Array(len);break;
  case "2":basearray=new Uint32Array(len);break;
  default:basearray=new Float64Array(len);
}
for(let i=0;i<basearray.length;i++)
  basearray[i]=Math.random()*0x1000000;

let cpus=4,
    chunksize=basearray.length/cpus,
    chunksw=[],chunkswt=[];
for(let i=0;i<cpus;i++)
  chunkswt[i]=(chunksw[i]=basearray.slice(i*chunksize,(i+1)*chunksize)).slice();

let start;
let code="onmessage=event=>postMessage(event.data.sort((a,b)=>a-b));";
let ws=[],cnt=0;
for(let i=0;i<cpus;i++){
  ws[i]=new Worker("data:text/plain,"+escape(code));
  let j=i;
  ws[i].onmessage=event=>{
    chunksw[j]=event.data;
    if(++cnt===cpus){
      console.log("Non-transfer:",Date.now()-start);
      // launch transfer measurement
      cnt=0;start=Date.now();
      for(let i=0;i<cpus;i++)
        wst[i].postMessage(chunkswt[i].buffer,[chunkswt[i].buffer]);    }
  };
}

let codet;
switch(typ){
  case "1":
    codet="onmessage=event=>{"+
          "let arr=new Float32Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
    break;
  case "2":
    codet="onmessage=event=>{"+
          "let arr=new Uint32Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
    break;
  default:
    codet="onmessage=event=>{"+
          "let arr=new Float64Array(event.data);"+
          "arr.sort((a,b)=>a-b);"+
          "postMessage(event.data,[event.data]);};";
}
let wst=[];
for(let i=0;i<cpus;i++){
  wst[i]=new Worker("data:text/plain,"+escape(codet));
  let j=i;
  wst[i].onmessage=event=>{
    switch(typ){
      case "1":chunkswt[j]=new Float32Array(event.data);break;
      case "2":chunkswt[j]=new Uint32Array(event.data);break;
      default:chunkswt[j]=new Float64Array(event.data);
    }
    if(++cnt===cpus){
      console.log("Transfer:",Date.now()-start);
      if(len<=20)
        for(let i=0;i<cpus;i++)
          console.log(chunksw[i],chunkswt[i]);
    }
  };
}

// launch non-transfer measurement
start=Date.now();
for(let i=0;i<cpus;i++)
  ws[i].postMessage(chunksw[i]);

Этот код немного запутан, поскольку может быть передан буфер, а не сами типизированные массивы, а также, пока второе измерение инициализируется как прямая копия -paste (который уже не так хорош), затем он запускается изнутри функции завершения первой.

(я не буду sh, чтобы предоставить точные результаты измерений, потому что мой P C тоже делает некоторые другие вещи. Просто запустите отрывки пару раз с различными или даже повторяющимися параметрами)

...