Рабочие не прекратят работу в кластере NodeJS - PullRequest
0 голосов
/ 14 марта 2019

У меня есть программа, которая запускает веб-сокет, прослушивает фиксированное количество сообщений, анализирует их и записывает их на диск, а затем снова закрывает веб-сокет. Чтобы повысить производительность, я использовал NodeJS cluster для порождения нескольких рабочих, которые одновременно работают с этими сообщениями, где каждый рабочий открывает свой собственный веб-сокет и прослушивает поступающие сообщения.

Хотя это работает однопоточно и завершается после того, как все было закрыто, каким-то образом тот же код, выполняемый в работнике, вообще не завершается. Проблема в том, что я не могу использовать process.exit(0), так как я записываю много данных на диск, и я должен быть уверен, что все асинхронные потоки записи могут завершить свою работу до фактического выхода из программы.

Пример кода:

if (parseInt(command.workers, 10) === 1) {
    console.log("Starting Single Process...");
    singleProcess();
} else if (cluster.isMaster) {
    masterProcess();
} else {
    childProcess();
}

function masterProcess() {
    for (let i = 0; i < numWorkers; i++) {
        const worker = cluster.fork(configQueue.shift());
        workers.add(worker);

        cluster.on("exit", (exitedWorker, code, signal) => {
            // this is never reached
            console.log(`worker exited: ${exitedWorker.process.pid}`);
        }
    }
}

// never terminates
function childProcess() {
    console.log(`Worker ${process.pid} started`);
    const processor = MyProcessor();
    processor.run();
}

// does terminate
function singleProcess() {
    const processor = MyProcessor();
    processor.run();
}

Процессор, который я использую для анализа данных websocket, выглядит примерно так:

import WebSocket, {Data} from "ws";
import fs, {WriteStream} from "fs";

export class MyProcessor {

    private readonly ws: WebSocket;
    private readonly writer: WriteStream;

    public constructor() {
        this.ws = new WebSocket('...');
        this.writer = fs.createWriteStream('...');

        process.on('SIGTERM', () => {
            this.gracefulShutdown();
        });
    }

    public run() {
        this.ws.onopen = () => {
            this.logger.info("websocket opened");
        };

        this.ws.onclose = () => {
            this.logger.info("websocket closed");
            this.writer.close();
        };

        this.ws.onmessage = (event: { data: Data }) => {
            this.parse(message);
        };
    }

    public function close() {
        this.gracefulShutdown();
    }

    private function parse(message) {
        // parse message 
        this.writer.write(parsedMessage);

        // if limit reached, exit here
        if (...) this.close();
    }

    private function gracefulShutdown(): void {
        this.ws.close();
        process.exitCode = 0;       
    }
}

Я также использовал пакет why-is-node-running для поиска открытых обработчиков, но он мне ничего не показал. Как я могу убедиться, что эта программа корректно завершает работу при работе в качестве рабочего - без использования process.exit(), который убивает мой WriteStream до того, как все сообщения будут записаны на диск?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...