как повторно использовать пул веб-серверов в node js для выполнения асинхронных c задач - PullRequest
2 голосов
/ 26 мая 2020

Я запускаю очередь из более чем 7000 асинхронных c задач параллельно, используя очередь asyn c с максимальным параллелизмом 18.

В каждой asyn c задача Мне нужно создать веб-сервер, поработать и закрыть веб-сервер. Я использую get-port , чтобы получить доступный порт, но я все еще попадаю в состояние гонки, когда две или более разных задач будут пытаться использовать один и тот же номер порта.

Моя идея состоит в том, чтобы создайте эти веб-серверы один раз и повторно используйте их в разных задачах. Таким образом, я не столкнусь с проблемами, когда несколько веб-серверов могут попытаться повторно использовать один и тот же порт.

Как я могу убедиться, что я НЕ повторно использую один и тот же веб-сервер между параллельными asyn c задачи?

Вот код, некоторые нерелевантные части отсутствуют:

const asyncTask = async (task) => {
    const port = await getPort()
    // static files server
    const server = http.createServer((request, response) => {
        // You pass two more arguments for config and middleware
        // More details here: https://github.com/zeit/serve-handler#options
        return serveHandler(request, response, {
            public: 'dist'
        })
    })
    // start listening
    server.listen(port, () => {
        console.log(`worker running for task ${task} at port ${port}`)
    })

    // do the async work

    server.close()

    return 0
}

const q = queue(asyncTask, 18)
q.drain(function() {
    console.log('all items have been processed')
})
q.error(function(err, task) {
    console.error(`task ${JSON.stringify(task)} experienced an error`, err)
})
q.push(tasks)

Ответы [ 2 ]

1 голос
/ 26 мая 2020

Повторное использование ваших веб-серверов может быть немного сложным, потому что вам нужно будет найти способ изменить обратный вызов (для обработки асинхронной c задачи) каждый раз, когда вы берете новую задачу, без воссоздания нового веб-сервера.

А как насчет использования фиксированной очереди портов и unshift() и pop() их после каждой задачи?

const ports = [3001, 3002, ..., 3018];

const asyncTask = async (task) => {

    while (ports.length === 0) {
      // Wait for a port to become free
    }

    const port = ports.pop();

    // static files server
    const server = http.createServer((request, response) => {
        // You pass two more arguments for config and middleware
        // More details here: https://github.com/zeit/serve-handler#options
        return serveHandler(request, response, {
            public: 'dist'
        })
    })

    // start listening
    server.listen(port, () => {
        console.log(`worker running for task ${task} at port ${port}`)
    })

    // do the async work

    server.close()

    // Once the work is done, add the port back to the list of ports, 
    // so that another task can pick it up again. This piece of code should be 
    ports.unshift(port);

    return 0
}

0 голосов
/ 27 мая 2020

В итоге я создал список веб-серверов и поделился ими между задачами asyn c. внутри каждой задачи я могу вытащить веб-сервер из списка, использовать его, а затем вернуть его обратно в общий список веб-серверов.

Я отказался от использования get-port , и вместо этого я используя аналогичный метод, основанный на github gist

function getAvailablePort(startPort = 10000) {
    let port = startPort

    return () => {
        const server = http.createServer()
        return new Promise((resolve, reject) => server
            .once('error', error => {
                port += 1
                error.code !== 'EADDRINUSE' ? reject(error) : server.listen(port)
            })
            .once('listening', () => server.close(() => {
                port += 1
                resolve(port)
            }))
            .listen(port)
        )
    }
}

Мой код теперь выглядит примерно так

(async () => {
    let totalTasks = 0
    let finishedSuccess = 0
    let finishedWithError = 0
    const concurrencyLimit = 18

    try {

        // create a pool of shared servers
        const getNextPort = getAvailablePort()
        const poolOfSharedServers = []
        for(let i=0; i<concurrencyLimit; i++) {
            let port = await getNextPort()

            // create static files server and start listening
            const server = http.createServer((request, response) => {
                // You pass two more arguments for config and middleware
                // More details here: https://github.com/zeit/serve-handler#options
                return serveHandler(request, response, { public: 'dist' })
            }).listen(port)

            poolOfSharedServers.push(server)
        }

        // create async task
        const asyncTask = createAsyncTask(poolOfSharedServers)

        // build an async queue of tasks
        const q = queue(async function(task) {
            try {
                const res = await asyncTask(task)
                if (res.done) {
                    finishedSuccess += 1
                    await logger.saveLog(res.value)
                } else {
                    finishedWithError += 1
                }
            } catch (err) {
                finishedWithError += 1
            }
        }, concurrencyLimit)

        // on all tasks finished
        q.drain(function() {
            console.log(`\nall ${totalTasks} tasks have been processed. ${finishedSuccess} finished successfully and ${finishedWithError} finished with errors\n`)
        })

        // on task error
        q.error(function(err, task) {
            finishedWithError += 1
            console.error(`task ${JSON.stringify(task)} experienced an error`, err)
        })

        // create tasks
        // tasks = ...

        totalTasks = tasks.length

        // push all tasks into the queue
        q.push(tasks)
    } catch (err) {
        console.error(err)
    }
})()

, где createAsyncTask выглядит примерно так:

const createAsyncTask = (poolOfSharedServers) => async (task) => {
    let result, server

    try {

        // get server out of shared pool of servers
        server = poolOfSharedServers.pop()

        // get current port
        const port = server.address().port

        // do some async work using task and server
        await new Promise(resolve => setTimeout(resolve, 1000))

        result = {
            done: true,
            value: 'some value'
        }
    } catch (err) {
        result = {
            done: false,
            err
        }
    }

    if (server) {
        // add server back to the pool of shared servers
        poolOfSharedServers.unshift(server)
    }

    return result
}
...