Потоковая загрузка нескольких файлов из S3 в виде zip через Akka HTTP или Play - PullRequest
0 голосов
/ 30 июня 2019

У меня есть структура S3, которая является результатом задания Spark, которое записывает разделенные CSV-файлы, как показано ниже.

bucketA
  output
    cleaned-data1
      part000....csv
      part001....csv
      part002....csv
    cleaned-data2
      .....

Мне нужно иметь возможность иметь конечную точку HTTP Akka, которая указывает на имя выходного файла, чтобы загрузить все детали в виде zip-файла: https://..../download/cleaned-data1.

Когда вызывается эта конечная точка, в идеале я хочу:

  1. Открыть поток zip с сервера в браузер клиента

  2. Открытие файлов деталей и потоковая передача байтов в поток zip напрямую клиенту без какой-либо буферизации на сервере, чтобы избежать проблемы с памятью

Общий размер всех частей может составлять до 30 ГБ без сжатия.

Есть ли способ сделать это через Akka Stream, Akka HTTP или Play? Могу ли я использовать библиотеку Alpakka?

Отредактировано временно, основываясь на ответе Рамона:

  def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
    bucketNameToKeySource(bucket)
      .map(key => S3.download(bucket, key))
      .map(x => x.map(y => y.fold(Source.empty[ByteString])(_._1)))
      .flatMapConcat(identity)
      .flatMapConcat(identity)

1 Ответ

0 голосов
/ 02 июля 2019

Первым шагом является создание akka потока Source содержимого корзины :

type Key = String

def bucketNameToKeySource(bucket : String) : Source[Key, _] = 
  S3.listBucket(bucket, None)
    .map(_.key)

Теперь это можно комбинировать с возможностями загрузки S3 и flatMapConcat:

def bucketNameToFileContents(bucket : String) : Source[ByteString, _] = 
  bucketNameToKeySource(bucket)
    .map(key => S3.download(bucket, key))
    .map(_.getOrElse(Source.empty[ByteString])
    .flatMapConcat(identity)

Эта функция теперь может быть включена в ваш Route.Вопрос требует "открыть поток zip с сервера на клиент", поэтому используется encodeRespose:

def bucketNameToRoute(parentBucketName : String) : Route = 
  encodeResponse {
    path ("download" / Segment) { childBucketName =>

      val bucketName = parentBucketName + "/" + childBucketName

      val byteStrSource = bucketNameToFileContents(bucketName)

      complete(OK -> byteStrSource)
    } 
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...