Сохранить поток Doob ie из базы данных в файл - PullRequest
1 голос
/ 06 марта 2020

Doob ie select возвращает fs2.Stream(doobie.ConnectionIO, String). Если нам нужно записать его в файл, очевидным вариантом будет вызвать stream.compile.toList.transact(transactor) и затем сохранить этот список в файл.

Есть ли способ сохранить результат в потоковом режиме без преобразования его в список?

1 Ответ

2 голосов
/ 06 марта 2020

Хитрость заключается в том, чтобы преобразовать операции cats.IO в doobie.ConnectionIO с помощью Async[doobie.ConnectionIO].liftIO(IO(...)). Это позволяет красиво сочетать файловые операции с операциями с базой данных. Вот полный пример программы, которая передает результаты в файл.

package com.example

import java.io.BufferedWriter

import better.files.File
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream


object Example extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val xa = Transactor.fromDriverManager[IO](
      "org.postgresql.Driver",     // driver classname
      "jdbc:postgresql:example_db",     // connect URL (driver-specific)
      "postgres",                  // user
      ""                          // password
    )

    val drop = sql"drop table if exists example".update.run
    val create =
      sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
    val insert = Update[String]("insert into example (string_value) values (?)")
      .updateMany(List("one", "two", "three", "four", "five"))

    val setup = for {
      _ <- drop.transact(xa)
      _ <- create.transact(xa)
      _ <- insert.transact(xa)
    } yield ()

    val select: Stream[doobie.ConnectionIO, String] =
      sql"select string_value from example".query[String].stream
    val output = writeToFile(select).compile.drain.transact(xa)

    for {
      _ <- setup
      _ <- output
    } yield ExitCode.Success
  }

  private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
    Stream.resource(writer("./example.txt")).flatMap { writer =>
      result.intersperse("\n").chunks.evalMap { chunk =>
        Async[doobie.ConnectionIO].liftIO(IO(
          chunk.foreach(writer.write)
        ))
      }
    }
  }

  private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
    Resource.make {
      Async[doobie.ConnectionIO].liftIO(IO(
        File(path).newBufferedWriter
      ))
    } { outStream =>
      Async[doobie.ConnectionIO].liftIO(IO(
        outStream.close())
      )
    }
  }
}
...