Использовать потоки в многочастной загрузке на S3 - PullRequest
0 голосов
/ 03 апреля 2020

Текущий проект, над которым я работаю, требует, чтобы несколько процессов загружали данные в один файл в S3. Эти данные поступают из нескольких источников параллельно, поэтому для максимально быстрой обработки всех источников мы будем использовать несколько экземпляров Nodejs для прослушивания источников. Существуют ограничения по памяти и хранилищу, поэтому загрузить все загруженные данные в память или сохранить на диске, а затем выполнить одну загрузку не подлежит обсуждению.

Для соблюдения этих ограничений я реализовал потоковую загрузку: она буферизует небольшую часть данных из одного источника и направляет буфер в поток загрузки. Это работает очень хорошо при использовании одного nodejs процесса, но, как я уже говорил, цель состоит в том, чтобы обрабатывать все источники параллельно. Моей первой попыткой было открыть несколько потоков для одного и того же ключа объекта в корзине. Это просто переопределяет файл с данными из последнего потока для закрытия. Поэтому я отказался от этой опции.

// code for the scenario above, where each process will open a separete stream to
// the same key and perform it's data ingestion and upload.
openStreamingUpload() {
  const stream = require('stream');
  const AWS = require('aws-sdk');
  const s3 = new this.AWS.S3(/* s3 config */);

  const passThrough = new stream.PassThrough();

  const params = {
    Key: 'final-s3-file.txt',
    Bucket: 'my-bucket',
    Body: passThrough
  };

  s3
    .upload(params)
    .promise();

  return passThrough;
}

async main() { // simulating a "never ending" flow of data
  const stream = openStreamingUpload();
  let data = await receiveData();;

  do {
    stream.write(data);
    data = await receiveData();
  } while(data);
  stram.close();
}
main();

Затем я попробовал многочастную загрузку , которую предлагает S3 API. Сначала я создаю многокомпонентную загрузку, получаю ее идентификатор и сохраняю ее в общем пространстве. После этого я пытаюсь открыть многократную загрузку для всех процессов nodejs, которые будет использовать кластер, с одним и тем же UploadId, полученным заранее. Каждая из этих составных загрузок должна иметь поток, который будет передавать полученные данные. Проблема, с которой я столкнулся, заключалась в том, что при многоэтапной загрузке необходимо заранее знать длину части, и, поскольку я передаю поток, который не знаю, когда закроется, или объем данных, который будет передан, невозможно рассчитать его размер. , Код, вдохновленный этой реализацией .

  const AWS = require('aws-sdk');
  const s3 = new this.AWS.S3(/* s3 config */);
  async startMultipartUpload()
    const multiPartParams = {
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket'
    };
    const multipart = await s3.createMultipartUpload(multiPartParams).promise();

    return multipart.UploadId;
  }

  async finishMultipartUpload(multipartUploadId) {
    const finishingParams = {
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket',
      UploadId: multipartUploadId
    };
    const data = await s3.completeMultipartUpload(finishingParams).promise();

    return data;
  }

  async openMultiparStream(multipartUploadId) {
    const stream = require('stream');
    const passThrough = new stream.PassThrough();

    const params = {
      Body: passThrough.,
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket',
      UploadId: multipartUploadId,
      PartNumber: // how do I know this part number when it's, in principle, unbounded?
    };

    s3
      .uploadPart(params)
      .promise();

    return passThrough 
  }

 // a single process will start the multipart upload
 const uploadId startMultipartUpload();
 async main() { // simulating a "never ending" flow of data
   const stream = openMultiparStream(uploadId);
   let data = await receiveData();;

   do {
     stream.write(data);
     data = await receiveData();
   } while(data);
   stram.close();
 }

 main(); // all the processes will receive and upload to the same UploadId
 finishMultipartUpload(uploadId); // only the last process to closm will finish the multipart upload.

Осматривая, я наткнулся на статью из AWS, представляющую метод upload() API, и говорит, что он абстрагирует multipart API, чтобы разрешить использование потоковых данных для загрузки больших файлов. Поэтому мне интересно, возможно ли получить uploadId из потоковой «простой» загрузки, чтобы я мог передать этот Id вокруг кластера и загрузить в тот же объект, сохраняя при этом характеристики потоковой передачи c. Кто-нибудь когда-нибудь пробовал этот тип сценария «многопотоковой» загрузки?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...