FS2 передает ресурс (или эффект) как состояние - PullRequest
1 голос
/ 11 января 2020

Я пытаюсь реализовать приложение, которое управляет камерой. Команды камеры представлены в виде потока объектов CameraAction:

sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage

...

val s = Stream[F, CameraMessage]

Допустим, у меня есть тестовый поток, который выдает «Запись» и выдает «Стоп» через 20 секунд, через еще 20 секунд другое сообщение «Запись» и так далее, входной поток бесконечен.

Затем приложение потребляет «Запись», оно должно создать экземпляр конвейера GStreamer (т.е. это эффект) и «запустить» его на «Стоп» он «останавливает» конвейер и закрывает его. Затем при последующей «записи» шаблон повторяется с новым конвейером GStreamer.

Проблема в том, что мне нужно передать экземпляр нечистого, изменчивого объекта между дескрипторами потоковых событий.

Документация FS2 предлагает использовать чанки для создания потока с состоянием, поэтому я попытался


def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] = 
{ 
... create and open pipeline ... 
}

def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
 ... stop pipeline, release resources ... 
}

def effectPipe(pipelineDef: String)(implicit L: Logger[F]): 
Pipe[F, CameraMessage, F[Unit]] = {
    type CameraSessionHandle = Pipeline
    type CameraStream = Stream[F, CameraSessionHandle]

    s: Stream[F, CameraMessage] =>
      s.scanChunks(Stream[F, CameraSessionHandle]()) {
        case (s: CameraStream, c: Chunk[CameraMessage]) =>
          c.last match {
            case Some(Record(fileName)) =>
              (Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
            case Some(StopRecording) =>
              (Stream.empty, Chunk(s.compile.drain))
            case _ =>
              (s, Chunk.empty)
          }
      }
  }

Проблема с этим кодом в том, что фактическая запись не происходит при событии 'Запись', а вместо оценивается влияние всего фрагмента, т. е. когда приходит сообщение «StopRecording», камера включается, а затем немедленно снова выключается.

Как я могу передать "состояние" без чанкинга? Или есть какой-то другой способ добиться нужного мне результата?

Это может быть похоже на FS2 Stream с StateT [IO, _, _], периодически сбрасывающим состояние , но разница в том, что в моем случае состояние - это не чистая структура данных, а ресурс.

1 Ответ

2 голосов
/ 17 января 2020

В конце концов я смог решить эту проблему, используя шаблон изменяемой ссылки, как описано в https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html

Вот код:

import cats.effect._
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds

class FRef[F[_], T](implicit sync: Sync[F]) {
  private var state: T = _
  def set(n: T): F[Unit] = sync.delay(this.state = n)
  def get: F[T] = sync.pure(state)
}

object FRef {
  def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
}

class CameraImpl(id: String) extends Camera {

  override def record(): Unit = {
    println(s"Recording $id")
  }

  override def stop(): Unit = {
    println(s"Stopping $id")
  }

  override def free(): Unit = {
    Thread.sleep(500)
    println(s"Freeing $id")
  }
}

object Camera {
  def apply(id: String) = new CameraImpl(id)
}

trait Camera {
  def record(): Unit
  def stop(): Unit
  def free(): Unit
}

sealed trait CameraMessage
case class Record(recordId: String) extends CameraMessage
case object StopRecording extends CameraMessage

class Streamer[F[_]](implicit sync: Sync[F]) {

  def record(id: String): F[Camera] = {
    sync.delay {
      val r = Camera(id)
      r.record()
      r
    }
  }

  def stopRecording(pipe: Camera): F[Unit] = {
    sync.delay {
      pipe.stop()
      pipe.free()
    }
  }

  def effectPipe(state: FRef[F, Option[Camera]])(
      implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
    type CameraStream = Stream[F, Camera]

    s: Stream[F, CameraMessage] =>
      s.evalMap {
        case Record(fileName) =>
          for {
            r <- record(fileName)
            _ <- state.set(Some(r))
          } yield ()
        case StopRecording =>
          for {
            s <- state.get
            _ <- stopRecording(s.get)
            _ <- state.set(None)
          } yield ()
      }
  }
}

object FS2Problem extends IOApp {
  import scala.concurrent.duration._

  override def run(args: List[String]): IO[ExitCode] = {

    implicit val ec: ExecutionContextExecutor = ExecutionContext.global

    val streamer = new Streamer[IO]

    val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
      case (_, idx) =>
        idx % 2 match {
          case 0 =>
            Record(s"Record $idx")
          case _ =>
            StopRecording
        }
    }

    val ss = for {
      streamerState <- Stream.eval(FRef[IO, Option[Camera]])
      s <- s.through(streamer.effectPipe(streamerState))
    } yield ()

    ss.compile.drain.map(_ => ExitCode.Success)
  }
}

...