У меня есть поток потоков fs2, и я хотел бы создать сжатый поток, готовый для записи в файл с расширением *.zip
или для загрузки.
Проблема заключается в том, что поток никогда не завершается.Вот код:
package backup
import java.io.OutputStream
import cats.effect._
import cats.effect.implicits._
import cats.implicits._
import fs2.{Chunk, Pipe, Stream, io}
import java.util.zip.{ZipEntry, ZipOutputStream}
import fs2.concurrent.Queue
import scala.concurrent.{ExecutionContext, SyncVar}
// https://github.com/slamdata/fs2-gzip/blob/master/core/src/main/scala/fs2/gzip/package.scala
// https://github.com/scalavision/fs2-helper/blob/master/src/main/scala/fs2helper/zip.scala
// https://github.com/eikek/sharry/blob/2f1dbfeae3c73bf2623f65c3591d0b3e0691d4e5/modules/common/src/main/scala/sharry/common/zip.scala
object Fs2Zip {
private def writeEntry[F[_]](zos: ZipOutputStream)(implicit F: Concurrent[F],
blockingEc: ExecutionContext,
contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Unit] =
_.flatMap {
case (name, data) =>
val createEntry = Stream.eval(F.delay {
zos.putNextEntry(new ZipEntry(name))
})
val writeEntry = data.through(io.writeOutputStream(F.delay(zos.asInstanceOf[OutputStream]), blockingEc, closeAfterUse = false))
val closeEntry = Stream.eval(F.delay(zos.closeEntry()))
createEntry ++ writeEntry ++ closeEntry
}
private def zipP1[F[_]](implicit F: ConcurrentEffect[F],
blockingEc: ExecutionContext,
contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Byte] = entries => {
Stream.eval(Queue.unbounded[F, Option[Chunk[Byte]]]).flatMap { q =>
Stream.suspend {
val os = new java.io.OutputStream {
private def enqueueChunkSync(a: Option[Chunk[Byte]]) = {
println(s"enqueueChunkSync $a")
val done = new SyncVar[Either[Throwable, Unit]]
q.enqueue1(a).start.flatMap(_.join).runAsync(e => IO(done.put(e))).unsafeRunSync
done.get.fold(throw _, identity)
println(s"enqueueChunkSync done $a")
}
@scala.annotation.tailrec
private def addChunk(c: Chunk[Byte]): Unit = {
val free = 1024 - bufferedChunk.size
if (c.size > free) {
enqueueChunkSync(Some(Chunk.vector(bufferedChunk.toVector ++ c.take(free).toVector)))
bufferedChunk = Chunk.empty
addChunk(c.drop(free))
} else {
bufferedChunk = Chunk.vector(bufferedChunk.toVector ++ c.toVector)
}
}
private var bufferedChunk: Chunk[Byte] = Chunk.empty
override def close(): Unit = {
// flush remaining chunk
enqueueChunkSync(Some(bufferedChunk))
bufferedChunk = Chunk.empty
// terminate the queue
enqueueChunkSync(None)
}
override def write(bytes: Array[Byte]): Unit =
Chunk.bytes(bytes)
override def write(bytes: Array[Byte], off: Int, len: Int): Unit =
addChunk(Chunk.bytes(bytes, off, len))
override def write(b: Int): Unit =
addChunk(Chunk.singleton(b.toByte))
}
val write: Stream[F, Unit] = Stream
.bracket(F.delay(new ZipOutputStream(os)))((zos: ZipOutputStream) => F.delay(zos.close()))
.flatMap((zos: ZipOutputStream) => entries.through(writeEntry(zos)))
val read = q.dequeue
.unNoneTerminate
.flatMap(Stream.chunk(_))
read.concurrently(write)
}
}
}
def zip[F[_]: ConcurrentEffect: ContextShift](entries: Stream[F, (String, Stream[F, Byte])])(
implicit ec: ExecutionContext): Stream[F, Byte] =
entries.through(zipP1)
}
Код беззастенчиво скопирован из https://github.com/eikek/sharry/blob/master/modules/common/src/main/scala/sharry/common/zip.scala и обновлен для компиляции с последними версиями fs2
и cats-effect
Я сузил проблемудо enqueueChunkSync
:
private def enqueueChunkSync(a: Option[Chunk[Byte]]) = {
val done = new SyncVar[Either[Throwable, Unit]]
q.enqueue1(a).start.flatMap(_.join).runAsync(e => IO(done.put(e))).unsafeRunSync
done.get.fold(throw _, identity)
}
, который блокирует последний фрагмент.Когда я помещаю туда println
и уменьшаю буфер, я вижу, что чанки удаляются успешно до последнего.
Когда я удаляю блокирующий бит done.get.fold(throw _, identity)
, он, кажется, работает, но потом я представляю, что байтывсе сразу залито в поток?
Чем последний кусок отличается от предыдущих?