Мое решение состояло в том, чтобы создать обещание, которое обертывает поток преобразования, который читает данные, но не использует их, и хранит данные в массиве (включая обратный вызов выпуска). Когда было обнаружено paramsLen
, обещание было выполнено с объектом преобразования, затем канал был установлен, и, наконец, удержанные данные в потоке преобразования были удалены. См. Ниже:
// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {
let busboy = new Busboy({ headers: req.headers });
busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
file.on("end", () => {
console.log("File [" + fieldname + "] Finished");
});
file.on("data", data => {
console.log("File [" + fieldname + "] got " + data.length + " bytes");
});
return new Promise((f, r) => {
let ts: {
dataArray: Array<[Buffer, Function]>;
paramsLen: number;
firstLine: string;
lineSplitReg: RegExp;
stream: Transform;
drainDone: boolean;
drain(): void;
} = {
dataArray: [],
paramsLen: undefined,
firstLine: "",
lineSplitReg: /[\n\Z]/,
drainDone: false,
drain: () => {
ts.dataArray.forEach(x => {
x[1](null, x[0]);
});
ts.drainDone = true;
},
stream: new Transform({
transform: (data: Buffer, enc, callback: Function) => {
// if drain finished pass data straight through
if (ts.drainDone) {
return callback(null, data);
}
ts.dataArray.push([data, callback]);
if (!ts.paramsLen) {
let strChunk = data.toString();
if (ts.lineSplitReg.test(strChunk)) {
ts.firstLine += strChunk.split(ts.lineSplitReg)[0];
ts.paramsLen = ts.firstLine.split(",").length;
f(ts);
} else {
// long line. contiune reading in next data chunk
ts.firstLine += strChunk;
}
}
},
}),
};
file.pipe(ts);
})
.then(ts => {
let headers: string[] = [
"id",
"brand",
"product",
"serialNumber",
"site",
"area",
"location",
"longitude",
"latitude",
];
// add extra config headers once paramsLen has been discovered
let cNum = 1;
for (let i = headers.length; i < paramsLen; i = i + 2) {
headers.push(`c${cNum}`);
headers.push(`v${cNum}`);
cNum++;
}
ts.stream.pipe(
csv({
headers,
}),
);
// drain transform stream
ts.drain();
})
});
busboy.on("finish", () => {
console.log("Done parsing form!");
if (!importingDevicesFromCsv) {
fulfill();
}
});
req.pipe(busboy);
})