fs2 поток в сжатый zip fs2stream - PullRequest
       40

fs2 поток в сжатый zip fs2stream

0 голосов
/ 09 февраля 2019

У меня есть поток потоков fs2, и я хотел бы создать сжатый поток, готовый для записи в файл с расширением *.zip или для загрузки.

Проблема заключается в том, что поток никогда не завершается.Вот код:

package backup

import java.io.OutputStream

import cats.effect._
import cats.effect.implicits._
import cats.implicits._
import fs2.{Chunk, Pipe, Stream, io}
import java.util.zip.{ZipEntry, ZipOutputStream}
import fs2.concurrent.Queue

import scala.concurrent.{ExecutionContext, SyncVar}

// https://github.com/slamdata/fs2-gzip/blob/master/core/src/main/scala/fs2/gzip/package.scala
// https://github.com/scalavision/fs2-helper/blob/master/src/main/scala/fs2helper/zip.scala
// https://github.com/eikek/sharry/blob/2f1dbfeae3c73bf2623f65c3591d0b3e0691d4e5/modules/common/src/main/scala/sharry/common/zip.scala

object Fs2Zip {

  private def writeEntry[F[_]](zos: ZipOutputStream)(implicit F: Concurrent[F],
                                                     blockingEc: ExecutionContext,
                                                     contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Unit] =
    _.flatMap {
      case (name, data) =>
        val createEntry = Stream.eval(F.delay {
          zos.putNextEntry(new ZipEntry(name))
        })
        val writeEntry = data.through(io.writeOutputStream(F.delay(zos.asInstanceOf[OutputStream]), blockingEc, closeAfterUse = false))
        val closeEntry = Stream.eval(F.delay(zos.closeEntry()))
        createEntry ++ writeEntry ++ closeEntry
    }

  private def zipP1[F[_]](implicit F: ConcurrentEffect[F],
                          blockingEc: ExecutionContext,
                          contextShift: ContextShift[F]): Pipe[F, (String, Stream[F, Byte]), Byte] = entries => {

    Stream.eval(Queue.unbounded[F, Option[Chunk[Byte]]]).flatMap { q =>
      Stream.suspend {
        val os = new java.io.OutputStream {

          private def enqueueChunkSync(a: Option[Chunk[Byte]]) = {
            println(s"enqueueChunkSync $a")
            val done = new SyncVar[Either[Throwable, Unit]]
            q.enqueue1(a).start.flatMap(_.join).runAsync(e => IO(done.put(e))).unsafeRunSync
            done.get.fold(throw _, identity)
            println(s"enqueueChunkSync done $a")
          }
          @scala.annotation.tailrec
          private def addChunk(c: Chunk[Byte]): Unit = {
            val free = 1024 - bufferedChunk.size
            if (c.size > free) {
              enqueueChunkSync(Some(Chunk.vector(bufferedChunk.toVector ++ c.take(free).toVector)))
              bufferedChunk = Chunk.empty
              addChunk(c.drop(free))
            } else {
              bufferedChunk = Chunk.vector(bufferedChunk.toVector ++ c.toVector)
            }
          }

          private var bufferedChunk: Chunk[Byte] = Chunk.empty

          override def close(): Unit = {
            // flush remaining chunk
            enqueueChunkSync(Some(bufferedChunk))
            bufferedChunk = Chunk.empty
            // terminate the queue
            enqueueChunkSync(None)
          }
          override def write(bytes: Array[Byte]): Unit =
            Chunk.bytes(bytes)
          override def write(bytes: Array[Byte], off: Int, len: Int): Unit =
            addChunk(Chunk.bytes(bytes, off, len))
          override def write(b: Int): Unit =
            addChunk(Chunk.singleton(b.toByte))
        }

        val write: Stream[F, Unit] = Stream
          .bracket(F.delay(new ZipOutputStream(os)))((zos: ZipOutputStream) => F.delay(zos.close()))
          .flatMap((zos: ZipOutputStream) => entries.through(writeEntry(zos)))

        val read = q.dequeue
          .unNoneTerminate
          .flatMap(Stream.chunk(_))

        read.concurrently(write)
      }
    }
  }

  def zip[F[_]: ConcurrentEffect: ContextShift](entries: Stream[F, (String, Stream[F, Byte])])(
      implicit ec: ExecutionContext): Stream[F, Byte] =
    entries.through(zipP1)
}

Код беззастенчиво скопирован из https://github.com/eikek/sharry/blob/master/modules/common/src/main/scala/sharry/common/zip.scala и обновлен для компиляции с последними версиями fs2 и cats-effect

Я сузил проблемудо enqueueChunkSync:

  private def enqueueChunkSync(a: Option[Chunk[Byte]]) = {
    val done = new SyncVar[Either[Throwable, Unit]]
    q.enqueue1(a).start.flatMap(_.join).runAsync(e => IO(done.put(e))).unsafeRunSync
    done.get.fold(throw _, identity)
  }

, который блокирует последний фрагмент.Когда я помещаю туда println и уменьшаю буфер, я вижу, что чанки удаляются успешно до последнего.
Когда я удаляю блокирующий бит done.get.fold(throw _, identity), он, кажется, работает, но потом я представляю, что байтывсе сразу залито в поток?
Чем последний кусок отличается от предыдущих?

...