Перемещение файлов между корзинами S3 параллельно - PullRequest
2 голосов
/ 10 июня 2019

У меня есть две корзины S3 под разными аккаунтами. Я хочу переместить файлы из Bucket A в Bucket B, и я хочу сделать это как можно быстрее. Для этого я решил использовать Scala, асинхронные запросы и параллельную обработку для максимально быстрого перемещения файлов.

Для этого мне нужно вызвать команду listObjects (которая возвращает свое будущее, а затем для каждого объекта, возвращаемого этой командой, мне нужно последовательно запустить getObject, а затем putObject. Следовательно, затем listObjects должен порождает несколько фьючерсов getObject, и когда эти фьючерсы разрешаются, за ними следует фьючерс на putObject.

Я пытался сделать это примерно так:

def moveData(listObjects: Future[ListObjectV2Response]) = {
  listObjects.isCompleted {
    case Success(objListResp) =>
      val getAndPut = objListResp.objects()
                                 .map(obj => getObject(obj.key))
                                 .map(obj => putObject(obj.key))
      moveData(ListObjects(objlistResp.continuationToken())
    case Failure(error) => error.printStackTrace()
}

Я попробовал какой-то подход в этом аромате около 6 разных способов. Меня постоянно сбивают с толку следующие явления:

  1. .isCompleted имеет тип ответа Unit, и я не могу получить его с помощью рекурсивной функции.
  2. Мне часто нужно передавать различные значения, например, listObjects отвечает на запросы PutObject. Это приводит к передаче 3+ статических значений по цепочке .map в виде фьючерсов.
  3. listObjects - это одно будущее, но вызов .contents возвращает итерацию, которую нужно преобразовать во множество различных вариантов будущего. Это в сочетании с (2) приводит к некоторым по-настоящему хитрым случаям использования map и flatMap, когда scala хочет, чтобы карта / плоская карта была чем-то вроде Iterator[S3Object] => Future[NotInferred], когда фактический тип ответа равен Iterator[S3Object] => Iterator((Future[GetObject], Future[String]))

Как мне подойти к этой проблеме? Есть ли способ лучше?

EDIT:

Есть тысячи файлов, многие из них в нескольких ГБ. Всего копируемых данных исчисляется десятками терабайт. У меня крайне ограниченный доступ к корзине с исходным кодом и к учетной записи, в которой она находится. У меня никогда не будет возможности делать что-либо кроме операций Get и List.

Ответы [ 2 ]

2 голосов
/ 10 июня 2019

Если два сегмента находятся в различных регионах , вы можете использовать Межрегиональная репликация Amazon S3 .

Если они находятся в одном регионе , самый быстрый способ получить скопированные объекты:

  • Создать Amazon S3 Event для запуска функции AWS Lambda
  • The Лямбда-функция получит Bucket и ключ объекта, вызвавшего событие
  • Лямбда-функция должна скопировать объект в другое ведро с помощью команды CopyObject()

Преимущество этого метода в том, что нет необходимости перечислять объекты в корзине , потому что лямбда-функция будет запускаться для каждого нового объекта.

0 голосов
/ 11 июня 2019

Мне удалось написать чрезвычайно грубую функцию, которая технически выполняется, хотя я получаю ошибки, возникающие из-за проблем параллелизма.Пишу здесь для потомков, но мне не нравится этот ответ.


  def streamData(response: Future[ListObjectsV2Response]): Future[ListObjectsV2Response] = {
    var continuationToken: String = ""
    val operationChain = response.map((res: ListObjectsV2Response) => {
      println("LISTED - " + res.maxKeys())
      continuationToken = res.nextContinuationToken()
      res.contents().asScala.toList
    }).map((objs: List[S3Object]) => {
        for {
          obj <- objs
          fileName = obj.key().split("/").last
          getObjectRequest = GetObjectRequest.builder().bucket(BucketA).key(obj.key()).build()
          writeFilePath = Paths.get("./" + fileName)
          future = BucketAS3.getObject(getObjectRequest, writeFilePath).toScala
        } yield {
          (future, Future(obj.key()), Future(writeFilePath))
        }
    }).map((futureSeq: Seq[(Future[GetObjectResponse], Future[String], Future[Path])]) =>
      futureSeq.map((futureTuple: (Future[GetObjectResponse], Future[String], Future[Path])) => {
        for {
          getObjResp <- futureTuple._1
          key <- futureTuple._2
          writeFilePath <- futureTuple._3
        } yield {
          println("DOWNLOADED - " + key)
          val putObjectRequest = PutObjectRequest.builder()
            .bucket(bucketB).key("ability_dump/" + key).build()
          (BucketBS3.putObject(putObjectRequest, writeFilePath).toScala, Future(key), Future(writeFilePath))
        }
      })
    ).map((futuresSeq: Seq[Future[(Future[PutObjectResponse], Future[String], Future[Path])]]) => {
      futuresSeq.map((futures: Future[(Future[PutObjectResponse], Future[String], Future[Path])]) => {
        for {
          f <- futures
          putObjResp <- f._1
          key <- f._2
          writeFilePath <- f._3
        } yield {
          println("UPLOADED - " + key)
          val writeFile = new File(writeFilePath.toString)
          if (writeFile.exists) {
            writeFile.delete()
          }
          println("DELETED  - " + writeFilePath.toString)
        }
      })
    })
    streamData(BucketAS3.listObjectsV2(abilityListRequestBuilder.continuationToken(continuationToken).build()).toScala)
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...