Поток FS2 работает до конца InputStream - PullRequest
0 голосов
/ 24 мая 2018

Я очень новичок в FS2 и мне нужна помощь по поводу дизайна.Я пытаюсь создать поток, который будет вытягивать куски из базового InputStream, пока он не закончится.Вот что я попробовал:

import java.io.{File, FileInputStream, InputStream}

import cats.effect.IO
import cats.effect.IO._

object Fs2 {

  def main(args: Array[String]): Unit = {
    val is = new FileInputStream(new File("/tmp/my-file.mf"))
    val stream = fs2.Stream.eval(read(is))
    stream.compile.drain.unsafeRunSync()
  }

  def read(is: InputStream): IO[Array[Byte]] = IO {
    val buf = new Array[Byte](4096)
    is.read(buf)
    println(new String(buf))
    buf
  }
}

И программа печатает только первый чанк.Это разумно.Но я хочу найти способ «сигнализировать», где прекратить чтение, а где не остановить.Я имею в виду, продолжай звонить read(is) до конца.Есть ли способ добиться этого?

Я также пытался repeatEval(read(is)), но он продолжает читать вечно ... Мне нужно что-то среднее.

1 Ответ

0 голосов
/ 24 мая 2018

Используйте fs2.io.readInputStream или fs2.io.readInputStreamAsync.Первый блокирует текущий поток;последний блокирует поток в ExecutionContext.Например:

val is: InputStream = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.io.readInputStreamAsync(IO(is), 128)
...