Я пытаюсь реализовать приложение, которое управляет камерой. Команды камеры представлены в виде потока объектов CameraAction:
sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage
...
val s = Stream[F, CameraMessage]
Допустим, у меня есть тестовый поток, который выдает «Запись» и выдает «Стоп» через 20 секунд, через еще 20 секунд другое сообщение «Запись» и так далее, входной поток бесконечен.
Затем приложение потребляет «Запись», оно должно создать экземпляр конвейера GStreamer (т.е. это эффект) и «запустить» его на «Стоп» он «останавливает» конвейер и закрывает его. Затем при последующей «записи» шаблон повторяется с новым конвейером GStreamer.
Проблема в том, что мне нужно передать экземпляр нечистого, изменчивого объекта между дескрипторами потоковых событий.
Документация FS2 предлагает использовать чанки для создания потока с состоянием, поэтому я попытался
def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] =
{
... create and open pipeline ...
}
def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
... stop pipeline, release resources ...
}
def effectPipe(pipelineDef: String)(implicit L: Logger[F]):
Pipe[F, CameraMessage, F[Unit]] = {
type CameraSessionHandle = Pipeline
type CameraStream = Stream[F, CameraSessionHandle]
s: Stream[F, CameraMessage] =>
s.scanChunks(Stream[F, CameraSessionHandle]()) {
case (s: CameraStream, c: Chunk[CameraMessage]) =>
c.last match {
case Some(Record(fileName)) =>
(Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
case Some(StopRecording) =>
(Stream.empty, Chunk(s.compile.drain))
case _ =>
(s, Chunk.empty)
}
}
}
Проблема с этим кодом в том, что фактическая запись не происходит при событии 'Запись', а вместо оценивается влияние всего фрагмента, т. е. когда приходит сообщение «StopRecording», камера включается, а затем немедленно снова выключается.
Как я могу передать "состояние" без чанкинга? Или есть какой-то другой способ добиться нужного мне результата?
Это может быть похоже на FS2 Stream с StateT [IO, _, _], периодически сбрасывающим состояние , но разница в том, что в моем случае состояние - это не чистая структура данных, а ресурс.