Как разделить очередь ZIO между задачами ZIO через среду ZIO - PullRequest
5 голосов
/ 01 октября 2019

Я немного новичок в Scala и ZIO и столкнулся с какой-то странной загадкой.

Я хотел бы настроить среду ZIO, содержащую очередь ZIO, и позже иметь разные задачи ZIO offer иtake из этой общей очереди.

Я попытался определить свое окружение следующим образом

    trait MainEnv extends Console with Clock
    {
      val mainQueue = Queue.unbounded[String]
    }

и получить доступ к очереди из отдельных задач, подобных этому

    for {
      env      <- ZIO.environment[MainEnv]
      queue    <- env.mainQueue
      ...

, но в своем тесте я наблюдаю каждое из своих отдельныхЗадачам дается отдельный экземпляр Queue.
Просмотр подписи для unbounded

  def unbounded[A]: UIO[Queue[A]]

Я вижу, что она не сразу возвращает очередь, а скорее возвращает эффект, которыйвозвращает очередь, поэтому, в то время как наблюдаемое поведение имеет смысл, это совсем не то, на что я надеялся, и я не вижу четкого способа получить желаемое поведение.

Буду признателен за любые предложения относительнокак достичь своей цели установки различных задач, взаимодействующих через общую очередь, хранящуюся в среде.


Для справки приведен мой код и выходные данные.

пример выполнения

bash-3.2$ sbt run
[info] Loading project definition from /private/tmp/env-q/project
[info] Loading settings for project root from build.sbt ...
[info] Set current project to example (in build file:/private/tmp/env-q/)
[info] Compiling 1 Scala source to /private/tmp/env-q/target/scala-2.12/classes ...
[info] Done compiling.
[info] Packaging /private/tmp/env-q/target/scala-2.12/example_2.12-0.0.1-SNAPSHOT.jar ...
[info] Done packaging.
[info] Running example.Main 
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@65b9a444
env example.Main$$anon$1@36fbcafd queue zio.Queue$$anon$1@7c050764

(висит здесь - обратите внимание, что объект env такой же, но объекты очереди разные, поэтому вторая задача застряла)

/ tmp / env-q / test.scala

Здесьмой полный тест, которыйоснован на примере из слайда 37: https://www.slideshare.net/jdegoes/zio-queue

    package example
    import zio.{App, Queue, ZIO}
    import zio.blocking.Blocking
    import zio.clock.Clock
    import zio.console._

    trait MainEnv extends Console with Clock    // environment with queue
    {
      val mainQueue = Queue.unbounded[String]
    }

    object Main extends App                     // main test
    {
      val task1 = for {                         // task to add something to the queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.offer("Give me Coffee!")
      } yield ()

      val task2 = for {                         // task to remove+print stuff from queue
        env      <- ZIO.environment[MainEnv]
        queue    <- env.mainQueue
        _        <- putStrLn(s"env $env queue $queue")
        _        <- queue.take.flatMap(putStrLn)
      } yield ()

      val program = ZIO.runtime[MainEnv]        // top level to run both tasks
        .flatMap {
          implicit rts =>
            for {
              _ <- task1.fork
              _ <- task2
            } yield ()
        }

      val runEnv = new MainEnv with Console.Live with Clock.Live

      def run(args: List[String]) =
        program.provide(runEnv).fold(_ => 1, _ => 0)
    }

/ tmp / env-q / build.sbt

Вот тот build.sbt, который я использовал

val ZioVersion = "1.0.0-RC13"

lazy val root = (project in file("."))
  .settings(
    organization := "example",
    name := "example",
    version := "0.0.1-SNAPSHOT",
    scalaVersion := "2.12.8",
    scalacOptions ++= Seq("-Ypartial-unification"),
    libraryDependencies ++= Seq(
      "dev.zio"                 %% "zio"                 % ZioVersion,
    ),
    addCompilerPlugin("org.spire-math" %% "kind-projector"     % "0.9.6"),
    addCompilerPlugin("com.olegpy"     %% "better-monadic-for" % "0.2.4")
  )

scalacOptions ++= Seq(
  "-deprecation",               // Emit warning and location for usages of deprecated APIs.
  "-encoding", "UTF-8",         // Specify character encoding used by source files.
  "-language:higherKinds",      // Allow higher-kinded types
  "-language:postfixOps",       // Allows operator syntax in postfix position (deprecated since Scala 2.10)
  "-feature",                   // Emit warning and location for usages of features that should be imported explicitly.
  "-Ypartial-unification",      // Enable partial unification in type constructor inference
  "-Xfatal-warnings",           // Fail the compilation if there are any warnings
)

1 Ответ

5 голосов
/ 01 октября 2019

На официальном канале Gitter для ZIO Core Адам Фрейзер предложил

Если вы хотите, чтобы в вашей среде было только Queue[String], а затем вы захотите использоватьтакой метод, как provideM с Queue.unbounded, чтобы создать одну очередь и предоставить ее всему вашему приложению. Вот где приходит provideM в отличие от provide. Это позволяет вам удовлетворить среду, которая требует A, предоставив ZIO[A].

Небольшое копание в источнике ZIO выявилополезный пример в DefaultTestReporterSpec.scala .

Определив среду как

  trait MainEnv extends Console with Clock    // environment with queue
  {
    val mainQueue: Queue[String]
  }

, изменив мои задачи для доступа к env.mainQueue с = вместо <- (потому что mainQueue - это Queue[String] сейчас, а не UIO[Queue[String]], удаление runEnv и изменение метода run в моем тесте для использования provideSomeM

  def run(args: List[String]) =
    program.provideSomeM(
      for {
        q <- Queue.unbounded[String]
      } yield new MainEnv with Console.Live with Clock.Live {
        override val mainQueue = q
      }
    ).fold(_ => 1, _ => 0)

Мне удалось получитьожидаемый результат:

sbt:example> run
[info] Running example.Main 
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
env example.Main$$anon$1@45bfc0da queue zio.Queue$$anon$1@13b73d56
Give me Coffee!
[success] Total time: 1 s, completed Oct 1, 2019 7:41:47 AM
...