Чтобы упростить распространение элемента CheckEntity
в потоке, нужно изменить класс CheckEntityResult
на соответствующий экземпляр CheckEntity
.Это будет выглядеть примерно так:
abstract class CheckEntityResult(entity: CheckEntity) extends Entity
case class PEntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class PostgresFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
case class REntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class RedisFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)
Затем, после настройки ваших акторов для обработки этих сообщений, вы можете использовать Source # ask
и mapAsync
(настроить уровни параллелизма какнеобходимо) взаимодействовать с актерами и избегать вложенного Future
в материализованном значении:
implicit val askTimeout = Timeout(5.seconds)
val runnableGraph = Source.single(CheckEntity(entity))
.ask[CheckEntityResult](parallelism = 3)(postgresActor)
.map {
case PostgresFailure(_, msg) => msg
case PEntity(e) => e
}
.mapAsync(parallelism = 3) {
case failureMsg: String => Future.successful(failureMsg)
case e: CheckEntity => (redisActor ? e).mapTo[CheckEntityResult]
}
.map {
case failureMsg: String => Left(failureMsg)
case RedisFailure(_, msg) => Left(msg)
case r: REntity => Right(r)
}
.toMat(Sink.head)(Keep.right)
val futureResult = runnableGraph.run() // Future[Either[String, Entity]]