Как избавиться от «лишнего» будущего на этапах Akka Stream Flow? - PullRequest
0 голосов
/ 24 апреля 2018

Я работаю с Akka Streams и пытаюсь использовать этапы Flow для наиболее эффективного способа описания всего графика. На некоторых этапах я посылаю сообщения актерам по шаблону ask.

Конечно, когда я использую шаблон ask, мне нужно использовать mapTo, чтобы получить ожидаемый тип для дальнейшей обработки.

Вот пример:

val runnableGraph = Source.single(CheckEntity(entity))
  .map { check =>
    (postgresActor ? check)
      .mapTo[CheckEntityResult]
      .map {
        case failure: PostgresFailure => Left(failure.message)
        case pEntity: PEntity => Right(check)
    }
  }
  .map {
    _.map {
      case Left(msg) => Future(Left(msg))
      case Right(check) =>
        (redisActor ? pEntity)
          .mapTo[CheckEntityResult]
          .map {
            case failure: RedisFailure => Left(failure.message)
            case rEntity: REntity => Right(rEntity)
          }
    }
  }
  .toMat(Sink.head)(Keep.right)

//The result's type is Future[Future[Either[String, Entity]]]
val futureResult = runnableGraph.run()

Как мне избавиться от вложенного Future между этапами?

Ответы [ 2 ]

0 голосов
/ 25 апреля 2018

Вы можете рассмотреть преобразование вашего запроса актера в Flow вместе с mapAsync (с соответствующим параллелизмом):

val postgresCheck = (check: CheckEntity) =>
    (postgresActor ? check).mapTo[CheckEntityResult]
      .map {
        case failure: PostgresFailure => Left(failure.message)
        case pEntity: PEntity => Right(check)
      }

val redisCheck = (e: Either[String, CheckEntityResult]) => e match {
    case Left(msg) => Future(Left(msg))
    case Right(checkResult) =>
      (redisActor ? checkResult).mapTo[CheckEntityResult]
        .map {
          case failure: RedisFailure => Left(failure.message)
          case rEntity: REntity => Right(rEntity)
        }
  }

val postgresCheckFlow = (parallelism: Int) =>
    Flow[CheckEntity]
      .mapAsync[Either[String, CheckEntityResult]](parallelism)(postgresCheck)

val redisCheckFlow = (parallelism: Int) =>
    Flow[Either[String, CheckEntityResult]]
      .mapAsync[Either[String, CheckEntityResult]](parallelism)(redisCheck)

С преобразованными потоками ваш runnableGraph может быть собран как показано ниже с типом результата Future[Either[]]:

val runnableGraph = Source.single(CheckEntity(entity))
  .via(postgresCheckFlow(parallelism))
  .via(redisCheckFlow(parallelism))
  ...
0 голосов
/ 25 апреля 2018

Чтобы упростить распространение элемента CheckEntity в потоке, нужно изменить класс CheckEntityResult на соответствующий экземпляр CheckEntity.Это будет выглядеть примерно так:

abstract class CheckEntityResult(entity: CheckEntity) extends Entity

case class PEntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class PostgresFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)

case class REntity(entity: CheckEntity) extends CheckEntityResult(entity)
case class RedisFailure(entity: CheckEntity, message: String) extends CheckEntityResult(entity)

Затем, после настройки ваших акторов для обработки этих сообщений, вы можете использовать Source # ask и mapAsync (настроить уровни параллелизма какнеобходимо) взаимодействовать с актерами и избегать вложенного Future в материализованном значении:

implicit val askTimeout = Timeout(5.seconds)

val runnableGraph = Source.single(CheckEntity(entity))
  .ask[CheckEntityResult](parallelism = 3)(postgresActor)
  .map {
    case PostgresFailure(_, msg) => msg
    case PEntity(e) => e
  }
  .mapAsync(parallelism = 3) {
    case failureMsg: String => Future.successful(failureMsg)
    case e: CheckEntity => (redisActor ? e).mapTo[CheckEntityResult]
  }
  .map {
    case failureMsg: String => Left(failureMsg)
    case RedisFailure(_, msg) => Left(msg)
    case r: REntity => Right(r)
  }
  .toMat(Sink.head)(Keep.right)

val futureResult = runnableGraph.run() // Future[Either[String, Entity]]
...