Потоковая ссылка на локальный файл с помощью fs2 - PullRequest
0 голосов
/ 07 апреля 2019

Используя fs2 (версия 1.0.4) и эффект кошки IO, я могу транслировать URL в локальный файл,

import concurrent.ExecutionContext.Implicits.global

def download(spec: String, filename: String): Stream[IO, Unit] = 
  io.readInputStream((new URL(spec).openConnection.getInputStream), 4096, global, true)
    .through(io.file.writeAll(Paths.get(filename), global))

Однако этот фрагмент кода не возвращает никакой информации о процессе, когда он завершен. Кроме того, помимо того, что я знаю, что операция прошла успешно или нет, я также хочу узнать, сколько байтов считывается, если операция прошла успешно. Я не хочу проверять новый размер файла, чтобы получить эту информацию. С другой стороны, если операция не удалась, я хочу знать, что является причиной сбоя.

Я попытался attempt, но не смог разрешить последующие шаги, чтобы записать необработанные байты в новый файл. Пожалуйста, порекомендуйте. Спасибо

Ответы [ 2 ]

0 голосов
/ 16 апреля 2019

Я нашел решение в терминах Resource и IO и предложения @codenoodle.

Обновление # 1

Resource удалено, поскольку оно избыточно при использовании с FS2 и усложняет код.

import java.io.{
  FileNotFoundException,
  FileOutputStream,
  InputStream,
  OutputStream
}

import java.net.URL

import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2._

import scala.concurrent.ExecutionContext.Implicits.global

object LetStream extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    def write(source: IO[InputStream], target: IO[OutputStream]) =
      io.readInputStream(source, 4096, global)
        .chunks
        .flatMap(Stream.chunk)
        .observe(io.writeOutputStream(target, global))
        .chunks
        .fold(0L)((acc, chunk) => acc + chunk.size)

    write(IO(new FileOutputStream("image.jpg")), 
      IO(new URL("http://localhost:8080/images/helloworld.jpg")
            .openConnection
            .getInputStream))
      .use(_.compile.toList)
      .map {size =>
          println(s"Written ${size.head} bytes")
          ExitCode.Success
      }
      .recover {
        case t: FileNotFoundException =>
          println(s"Not found, ${t.getMessage}")
          ExitCode.Error
        case err =>
          println(err.getMessage)
          ExitCode.Error
      }
  }
}
0 голосов
/ 09 апреля 2019

Возможно, вы захотите поиграть с observe. Я уверен, что есть лучший способ сделать это, но вот пример, который может помочь вам разобраться:

Ваш оригинальный код, который компилируется и выполняется:

import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global

import java.net.URL
import java.nio.file.Paths

object Example1 {
  implicit val contextShift: ContextShift[IO] = IO.contextShift(global)

  def download(spec: String, filename: String): fs2.Stream[IO, Unit] =
    io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse=true)
      .through(io.file.writeAll(Paths.get(filename), global))

  def main(args: Array[String]): Unit = {
    download("https://isitchristmas.com/", "/tmp/christmas.txt")
      .compile.drain.unsafeRunSync()
  }
}

Использование наблюдения для подсчета байтов:

import fs2.io
import cats.effect.{IO, ContextShift}
import concurrent.ExecutionContext.Implicits.global

import java.net.URL
import java.nio.file.Paths

object Example2 {
  implicit val contextShift: ContextShift[IO] = IO.contextShift(global)

  final case class DlResults(bytes: Long)

  def download(spec: String, filename: String): fs2.Stream[IO, DlResults] =
    io.readInputStream[IO](IO(new URL(spec).openConnection.getInputStream), 4096, global, closeAfterUse = true)
      .observe(io.file.writeAll(Paths.get(filename), global))
      .fold(DlResults(0L)) { (r, _) => DlResults(r.bytes + 1) }

  def main(args: Array[String]): Unit = {
    download("https://isitchristmas.com/", "/tmp/christmas.txt")
      .compile
      .fold(()){ (_, r) => println(r)}
      .unsafeRunSync()
  }
}

Выход:

> DlResults(42668)
...