Процесс Node.js завершается без ошибок (используя потоки) - PullRequest
0 голосов
/ 20 ноября 2018

Я пишу лямбда-функцию, которая получает список текстовых файлов на S3, объединяет их вместе, а затем архивирует полученный файл. По какой-то причине функция выполняется в середине процесса без ошибок.

Полезная нагрузка, отправляемая в функцию Lambda, выглядит следующим образом:

{
  "sourceFiles": [
    "s3://bucket/largefile1.txt",
    "s3://bucket/largefile2.txt"
  ],
  "destinationFile": "s3://bucket/concat.zip",
  "compress": true,
  "omitHeader": false,
  "preserveSourceFiles": true
}

Сценарии, в которых эта функция работает совершенно нормально:

  1. Два файла маленькие, и compress === false
  2. Два файла маленькие, и compress === true
  3. Два файла большие, и compress === false

Если я попытаюсь сжать два больших файла, он выйдет посередине. Сам процесс конкатенации работает нормально, но когда он пытается использовать zip-stream для добавления потока в архив, происходит сбой.

Два больших файла вместе составляют 483 833 байта. Когда лямбда-функция завершается ошибкой, она считывает либо 290,229, либо 306,589 байт (это случайно), а затем завершается.

Это основная точка входа функции:

const packer = require('zip-stream');
const S3 = require('aws-sdk/clients/s3');
const s3 = new S3({ apiVersion: '2006-03-01' });
const { concatCsvFiles } = require('./csv');
const { s3UrlToParts } = require('./utils');

function addToZip(archive, stream, options) {
  return new Promise((resolve, reject) => {
    archive.entry(stream, options, (err, entry) => {
      console.log('entry done', entry);
      if (err) reject(err);
      resolve(entry);
    });
  });
}

export const handler = async event => {
  /**
   * concatCsvFiles returns a readable stream to pass to either the archiver or
   * s3.upload.
   */
  let bytesRead = 0;

  try {
    const stream = await concatCsvFiles(event.sourceFiles, {
      omitHeader: event.omitHeader,
    });
    stream.on('data', chunk => {
      bytesRead += chunk.length;
      console.log('read', bytesRead, 'bytes so far');
    });
    stream.on('end', () => {
      console.log('this is never called :(');
    });
    const dest = s3UrlToParts(event.destinationFile);
    let archive;

    if (event.compress) {
      archive = new packer();

      await addToZip(archive, stream, { name: 'concat.csv' });
      archive.finalize();
    }

    console.log('uploading');
    await s3
      .upload({
        Body: event.compress ? archive : stream,
        Bucket: dest.bucket,
        Key: dest.key,
      })
      .promise();

    console.log('done uploading');

    if (!event.preserveSourceFiles) {
      const s3Objects = event.sourceFiles.map(s3Url => {
        const { bucket, key } = s3UrlToParts(s3Url);

        return {
          bucket,
          key,
        };
      });

      await s3
        .deleteObjects({
          Bucket: s3Objects[0].bucket,
          Delete: {
            Objects: s3Objects.map(s3Obj => ({ Key: s3Obj.key })),
          },
        })
        .promise();
    }

    console.log('## Never gets here');

    // return {
    //   newFile: event.destinationFile,
    // };
  } catch (e) {
    if (e.code) {
      throw new Error(e.code);
    }

    throw e;
  }
};

И это код конкатенации:

import MultiStream from 'multistream';
import { Readable } from 'stream';
import S3 from 'aws-sdk/clients/s3';
import { s3UrlToParts } from './utils';

const s3 = new S3({ apiVersion: '2006-03-01' });

/**
 * Takes an array of S3 URLs and returns a readable stream of the concatenated results
 * @param {string[]} s3Urls Array of S3 URLs
 * @param {object} options Options
 * @param {boolean} options.omitHeader Omit the header from the final output
 */
export async function concatCsvFiles(s3Urls, options = {}) {
  // Get the header so we can use the length to set an offset in grabbing files
  const firstFile = s3Urls[0];
  const file = s3UrlToParts(firstFile);
  const data = await s3
    .getObject({
      Bucket: file.bucket,
      Key: file.key,
      Range: 'bytes 0-512', // first 512 bytes is pretty safe for header size
    })
    .promise();
  const streams = [];
  const [header] = data.Body.toString().split('\n');

  for (const s3Url of s3Urls) {
    const { bucket, key } = s3UrlToParts(s3Url);

    const stream = s3
      .getObject({
        Bucket: bucket,
        Key: key,
        Range: `bytes=${header.length + 1}-`, // +1 for newline char
      })
      .createReadStream();
    streams.push(stream);
  }

  if (!options.omitHeader) {
    const headerStream = new Readable();
    headerStream.push(header + '\n');
    headerStream.push(null);
    streams.unshift(headerStream);
  }

  const combinedStream = new MultiStream(streams);
  return combinedStream;
}

1 Ответ

0 голосов
/ 20 ноября 2018

Понял. Проблема была на самом деле с библиотекой zip-stream. Очевидно, это не работает с потоковой передачей S3 +. Я пытался yazl, и он отлично работает.

...