Как отложить инициализацию канала до тех пор, пока значение не будет обнаружено в потоке? - PullRequest
0 голосов
/ 18 июня 2020

Итак, я использую busboy в качестве промежуточного программного обеспечения для потоковой передачи данных формы с CSV-файлами на моем express сервере. Эти файлы CSV могут иметь различное количество дополнительных параметров конфигурации, поэтому мне нужно проанализировать первую строку, чтобы определить, сколько параметров существует, прежде чем инициализировать канал для csv-parser . Мой подход выглядит примерно так:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {

    let busboy = new Busboy({ headers: req.headers });

    busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
        file.on("end", () => {
            console.log("File [" + fieldname + "] Finished");
        });

        // number of CSV parameters to be found by splitting first line
        let paramsLen: number;

        // first line varible. Outside data callback incase first line is split over multiple data chunks
        let firstLine = "";

        // line split regex. works from new line and EOF
        const lineSplitReg: RegExp = /[\n\Z]/;

        return new Promise((f, r) => {
          file.on("data", data => {
              console.log("File [" + fieldname + "] got " + data.length + " bytes");
              if (!paramsLen) {
                  let strChunk = data.toString();
                  if (lineSplitReg.test(strChunk)) {
                      firstLine += strChunk.split(lineSplitReg)[0];
                      paramsLen = firstLine.split(",").length;

                      // paramsLen now found! init pipe to csv writeable
                      f();

                  } else {
                      // long line. contiune reading in next data chunk
                      firstLine += strChunk;
                  }
              }
          });
        })
        .then(() => {
          let headers: string[] = [
              "id",
              "brand",
              "product",
              "serialNumber",
              "site",
              "area",
              "location",
              "longitude",
              "latitude",
          ];

          // add extra config headers once paramsLen has been discovered
          let cNum = 1;
          for (let i = headers.length; i < paramsLen; i = i + 2) {
              headers.push(`c${cNum}`);
              headers.push(`v${cNum}`);
              cNum++;
          }

          file.pipe(
              csv({
                headers,
              }),
          );
        })
    });

    busboy.on("finish", () => {
        console.log("Done parsing form!");
        if (!importingDevicesFromCsv) {
            fulfill();
        }
    });

    req.pipe(busboy);
})

Проблема в том, что к тому времени, когда обещание выполнено, читаемый поток файла уже потребил некоторые или все данные файла, что означает, что эти фрагменты никогда не передаются в csv читаемый поток. Итак, как я могу прочитать данные потока, но не использовать их до тех пор, пока не будет установлен канал к парсеру csv, учитывая, что нам, возможно, придется заранее прочитать несколько блоков данных?

1 Ответ

0 голосов
/ 18 июня 2020

Мое решение состояло в том, чтобы создать обещание, которое обертывает поток преобразования, который читает данные, но не использует их, и хранит данные в массиве (включая обратный вызов выпуска). Когда было обнаружено paramsLen, обещание было выполнено с объектом преобразования, затем канал был установлен, и, наконец, удержанные данные в потоке преобразования были удалены. См. Ниже:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {

    let busboy = new Busboy({ headers: req.headers });

    busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
        file.on("end", () => {
            console.log("File [" + fieldname + "] Finished");
        });

        file.on("data", data => {
            console.log("File [" + fieldname + "] got " + data.length + " bytes");
        });

        return new Promise((f, r) => {

          let ts: {
              dataArray: Array<[Buffer, Function]>;
              paramsLen: number;
              firstLine: string;
              lineSplitReg: RegExp;
              stream: Transform;
              drainDone: boolean;
              drain(): void;
          } = {
              dataArray: [],
              paramsLen: undefined,
              firstLine: "",
              lineSplitReg: /[\n\Z]/,
              drainDone: false,
              drain: () => {
                  ts.dataArray.forEach(x => {
                      x[1](null, x[0]);
                  });
                  ts.drainDone = true;
              },
              stream: new Transform({
                  transform: (data: Buffer, enc, callback: Function) => {
                      // if drain finished pass data straight through
                      if (ts.drainDone) {
                          return callback(null, data);
                      }

                      ts.dataArray.push([data, callback]);

                      if (!ts.paramsLen) {
                          let strChunk = data.toString();
                          if (ts.lineSplitReg.test(strChunk)) {
                              ts.firstLine += strChunk.split(ts.lineSplitReg)[0];
                              ts.paramsLen = ts.firstLine.split(",").length;
                              f(ts);
                          } else {
                              // long line. contiune reading in next data chunk
                              ts.firstLine += strChunk;
                          }
                      }
                  },
              }),
          };

          file.pipe(ts);
        })
        .then(ts => {
          let headers: string[] = [
              "id",
              "brand",
              "product",
              "serialNumber",
              "site",
              "area",
              "location",
              "longitude",
              "latitude",
          ];

          // add extra config headers once paramsLen has been discovered
          let cNum = 1;
          for (let i = headers.length; i < paramsLen; i = i + 2) {
              headers.push(`c${cNum}`);
              headers.push(`v${cNum}`);
              cNum++;
          }

          ts.stream.pipe(
            csv({
                headers,
            }),
          );

          // drain transform stream
          ts.drain();
        })
    });

    busboy.on("finish", () => {
        console.log("Done parsing form!");
        if (!importingDevicesFromCsv) {
            fulfill();
        }
    });

    req.pipe(busboy);
})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...