У меня есть программа, которая запускает веб-сокет, прослушивает фиксированное количество сообщений, анализирует их и записывает их на диск, а затем снова закрывает веб-сокет. Чтобы повысить производительность, я использовал NodeJS cluster
для порождения нескольких рабочих, которые одновременно работают с этими сообщениями, где каждый рабочий открывает свой собственный веб-сокет и прослушивает поступающие сообщения.
Хотя это работает однопоточно и завершается после того, как все было закрыто, каким-то образом тот же код, выполняемый в работнике, вообще не завершается. Проблема в том, что я не могу использовать process.exit(0)
, так как я записываю много данных на диск, и я должен быть уверен, что все асинхронные потоки записи могут завершить свою работу до фактического выхода из программы.
Пример кода:
if (parseInt(command.workers, 10) === 1) {
console.log("Starting Single Process...");
singleProcess();
} else if (cluster.isMaster) {
masterProcess();
} else {
childProcess();
}
function masterProcess() {
for (let i = 0; i < numWorkers; i++) {
const worker = cluster.fork(configQueue.shift());
workers.add(worker);
cluster.on("exit", (exitedWorker, code, signal) => {
// this is never reached
console.log(`worker exited: ${exitedWorker.process.pid}`);
}
}
}
// never terminates
function childProcess() {
console.log(`Worker ${process.pid} started`);
const processor = MyProcessor();
processor.run();
}
// does terminate
function singleProcess() {
const processor = MyProcessor();
processor.run();
}
Процессор, который я использую для анализа данных websocket, выглядит примерно так:
import WebSocket, {Data} from "ws";
import fs, {WriteStream} from "fs";
export class MyProcessor {
private readonly ws: WebSocket;
private readonly writer: WriteStream;
public constructor() {
this.ws = new WebSocket('...');
this.writer = fs.createWriteStream('...');
process.on('SIGTERM', () => {
this.gracefulShutdown();
});
}
public run() {
this.ws.onopen = () => {
this.logger.info("websocket opened");
};
this.ws.onclose = () => {
this.logger.info("websocket closed");
this.writer.close();
};
this.ws.onmessage = (event: { data: Data }) => {
this.parse(message);
};
}
public function close() {
this.gracefulShutdown();
}
private function parse(message) {
// parse message
this.writer.write(parsedMessage);
// if limit reached, exit here
if (...) this.close();
}
private function gracefulShutdown(): void {
this.ws.close();
process.exitCode = 0;
}
}
Я также использовал пакет why-is-node-running
для поиска открытых обработчиков, но он мне ничего не показал. Как я могу убедиться, что эта программа корректно завершает работу при работе в качестве рабочего - без использования process.exit()
, который убивает мой WriteStream
до того, как все сообщения будут записаны на диск?