Как я могу составить ресурсы в Scala, все еще закрывая их правильно с scala -arm? - PullRequest
1 голос
/ 23 января 2020

У меня есть класс, который берет локальный файл, преобразует его и сохраняет его в GCS:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    if (destination.unzipGzip) {
      for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
           output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
        ByteStreams.copy(input, output)
      }
    } else if (destination.decompressBzip2) {
      for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        ByteStreams.copy(input, output)
      }
    } else {
      for (input <- managed(Files.newInputStream(localPath));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        IOUtils.copy(input, output)
      }
    }
  }

}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

Я пытаюсь удалить некоторое дублирование кода, в частности создание fileInputStream и gcsOutputStream. Но я не могу просто извлечь эти переменные в верхней части метода, потому что это создаст ресурсы вне блока scala -arm managed:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    // FIXME: creates a resource outside of the ARM block
    val fileInputStream = Files.newInputStream(localPath)
    val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))

    if (destination.unzipGzip) {
      unzipGzip(fileInputStream, gcsOutputStream)
    } else if (destination.decompressBzip2) {
      decompressBzip2(fileInputStream, gcsOutputStream)
    } else {
      copy(fileInputStream, gcsOutputStream)
    }
  }

  private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input ← managed(new ZipInputStream(inputStream));
         output ← managed(new GZIPOutputStream(outputStream))) {
      ByteStreams.copy(input, output)
    }
  }

  private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(new BZip2CompressorInputStream(inputStream));
         output <- managed(outputStream)) {
      ByteStreams.copy(input, output)
    }
  }

  private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(inputStream);
         output <- managed(outputStream)) {
      IOUtils.copy(input, output)
    }
  }
}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

Как видите, код намного понятнее и тестируемее, но ресурсы обрабатываются неправильно, так как они не "управляемы". Например, если при создании gcsOutputStream возникает исключение, fileInputStream не будет закрыто.

Возможно, я мог бы решить эту проблему, используя источники и приемники Google Guava , но я Мне интересно, есть ли лучший способ сделать это в Scala, не вводя Гуава. В идеале, используя стандартную библиотеку, или функцию scala -arm, или, может быть, даже в Cats?

  • Должен ли я определять fileInputStream и gcsOutputStream как функции, которые ничего не принимают и возвращают поток? Кажется, код будет более многословным с () => InputStream и () => OutputStream везде?
  • Должен ли я использовать несколько scala -arm "managed" для пониманий (один для определения fileInputStream и gcsOutputStream, и еще один внутри каждой подфункции)? Если я это сделаю, разве не проблема, что «внутренний» входной поток будет закрыт дважды?
  • Существует ли чистый и «скалярный» 1036 * подход к выполнению этого, которого я не вижу?

1 Ответ

1 голос
/ 23 января 2020

Вы можете изменить его следующим образом:

Сначала объявите управляемые ресурсы:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

Это не открывает эти ресурсы, это просто объявление, что вы хотите, чтобы эти ресурсы были удалось.

Тогда вы можете использовать map, чтобы обернуть их в нужные декораторы (например, ZipInputStream):

if (destination.unzipGzip) {
  for (input ← fileInputStream.map(s => new ZipInputStream(s));
       output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
} else if (destination.decompressBzip2) {
  for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
       output <- gcsOutputStream) {
    ByteStreams.copy(input, output)
  }
} else {
  for (input <- fileInputStream;
       output <- gcsOutputStream) {
    IOUtils.copy(input, output)
  }
}

Конечно, ManagedResource[A] - это просто значение, так что вы можете даже передать его методу в качестве параметра:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
  for (input ← inputStream.map(s => new ZipInputStream(s));
       output ← outputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
}
...