Альтернатива использованию Future.sequence внутри Akka Actors - PullRequest
2 голосов
/ 05 августа 2020

У нас есть довольно сложная система, разработанная с использованием Akka HTTP и модели Actors. До сих пор мы широко использовали шаблон запроса и смешанные фьючерсы и субъекты.

Например, субъект получает сообщение, ему необходимо выполнить 3 операции параллельно, объединить результат из этих данных и вернуть его отправителю. Мы использовали

  1. , чтобы объявить новую переменную в обратном вызове сообщения актора, чтобы сохранить отправителя (поскольку мы используем Future.map, это может быть другой отправитель).
  2. выполнил все эти 3 фьючерса параллельно с использованием Future.sequence (иногда это вызов функции, которая возвращает будущее, а иногда спрашивает другому актору, чтобы получить что-то от него)
  3. объединить результат всех 3 фьючерсов с использованием map или flatMap функции Future.sequence result
  4. конвейера конечный результат отправителю с помощью pipeTo

Вот упрощенный код:

case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) => {
      val sen = sender

      val result: Future[Seq[Map[String, Any]]] = if (paging.getOrElse(Paging(0, 0)) == Paging(0, 0)) Future.successful(Seq.empty)
      else {
        val start = System.currentTimeMillis()

        val profileF = profileActor ? Get(userId)

        Future.sequence(Seq(profileF, getSymbols(`type`, id), getData(paging, timeRange, platform)).map { result =>
          logger.info(s"Got ${result.size} news in ${System.currentTimeMillis() - start} ms")
          result
        }.recover { case ex: Throwable =>
          logger.error(s"Failure on getting data: ${ex.getMessage}", ex)
          Seq.empty
        }
      }

      result.pipeTo(sen)
    }

Функция getAndProcessData содержит Future.sequence с параллельным выполнением 3 фьючерсов.

Теперь, когда я читаю больше и многое другое об Akka, я вижу, что использование ask создает еще одного слушателя актера. Вот вопросы:

  1. Как мы часто используем ask, может ли это привести к большому количеству потоков, используемых в системе, и, возможно, иногда к их нехватке?
  2. Использование Future.map также означает разная нить часто. Я читал об одной иллюзии актера потока, которую можно легко сломать с помощью смешивания Futures.
  3. Кроме того, может ли это плохо повлиять на производительность?
  4. Нужно ли нам сохранять отправителя во временной переменной send, раз уж мы используем pipeTo? Можем ли мы сделать только pipeTo (отправитель). Кроме того, объявление sen почти в каждом приеме обратного вызова тратит много ресурсов? Я ожидаю, что его ссылка будет удалена после завершения операции.
  5. Есть ли шанс спроектировать такую ​​систему лучше, не используя карту и не прося так много? Я рассмотрел примеры, когда вы просто передаете ссылку replyTo на некоторого актера и используете tell вместо ask. Кроме того, отправка сообщения самому себе, а затем ответ исходному отправителю может заменить работу с Future.map в некоторых сценариях ios. Но как его разработать, имея в виду, что мы хотим выполнять 3 операции asyn c параллельно и возвращать отформатированные данные отправителю? Чтобы можно было форматировать данные, нам необходимо выполнить все эти 3 операции.

Я старался не включать во многие примеры, надеюсь, вы понимаете наши опасения и проблемы. Много вопросов, но мне бы очень хотелось понять, как это работает, просто и понятно

Заранее спасибо

Ответы [ 3 ]

2 голосов
/ 05 августа 2020

Если вы хотите выполнять 3 операции параллельно, вам нужно будет создать 3 Future значений, которые потенциально будут использовать 3 потока, и этого нельзя избежать.

Я не уверен в чем проблема с map, но в этом коде есть только один вызов, и в этом нет необходимости.

Вот один способ очистить код, чтобы избежать создания ненужных значений Future (непроверено! ):

case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) =>
  if (paging.forall(_ == Paging(0, 0))) {
    sender ! Seq.empty
  } else {
    val sen = sender
    val start = System.currentTimeMillis()

    val resF = Seq(
      profileActor ? Get(userId),
      getSymbols(`type`, id),
      getData(paging, timeRange, platform),
    )

    Future.sequence(resF).onComplete {
      case Success(result) =>
        val dur = System.currentTimeMillis() - start
        logger.info(s"Got ${result.size} news in $dur ms")

        sen ! result
      case Failure(ex)
        logger.error(s"Failure on getting data: ${ex.getMessage}", ex)

        sen ! Seq.empty
    }
  }

Вы можете избежать ask, создав свой собственный рабочий поток, который собирает различные результаты и затем отправляет результат отправителю, но это, вероятно, сложнее, чем здесь требуется.

1 голос
/ 05 августа 2020

Актер использует поток в диспетчере только при обработке сообщения. Поскольку количество сообщений, создаваемых актором для управления запросом, равно одному, очень маловероятно, что шаблон запроса сам по себе вызовет голодание потока. Если вы уже очень близки к истощению потоков, запрос может быть соломинкой, которая сломает спину верблюда.

Смешивание Future s и актеров может разрушить иллюзию однопоточного потока, если и только если код выполнение в Future обращается к состоянию актора (что означает, в основном, var s или изменяемые объекты, определенные вне обработчика receive).

запрос-ответ и хотя бы один раз (между ними, они покрывают, по крайней мере, большую часть мотивации модели запроса), в целом, ограничивают пропускную способность по сравнению с жестами, действующими не более одного раза. Реализация запрос-ответ или хотя бы один раз без шаблона запроса может в некоторых ситуациях (например, использование replyTo ActorRef для конечного получателя) быть меньше накладных расходов, чем запросы конвейера, но, вероятно, не значительно. Запросы в качестве основной точки входа в систему-актор (например, в потоках, обрабатывающих HTTP-запросы или обрабатывающих сообщения из некоторой шины сообщений), как правило, в порядке, но запросы от одного актора к другому - хорошая возможность для оптимизации.

Обратите внимание, что, особенно если ваш актер импортирует context.dispatcher как неявный ExecutionContext, преобразования на Future s в основном идентичны одноразовым актерам.

Ситуации, в которых вы хотите, чтобы произошло несколько вещей (особенно когда вам нужно управлять частичным отказом (Future.sequence.recover - возможный признак этой ситуации, особенно если recover становится нетривиальным)) являются потенциальными кандидатами на роль актера саги для организации одного конкретного запроса / ответа.

0 голосов
/ 06 августа 2020

Я бы посоветовал вместо использования Future.sequence использовать Souce из Akka, который будет запускать все фьючерсы параллельно, в котором вы также можете обеспечить параллелизм. Вот пример кода:

Source.fromIterator( () => Seq(profileF, getSymbols(`type`, id), getData(paging, timeRange, platform)).iterator )
        .mapAsync( parallelism = 1 ) { case (seqIdValue, row) =>
          row.map( seqIdValue -> _ )
        }.runWith( Sink.seq ).map(_.map(idWithDTO => idWithDTO))

Это вернет Future [Seq [Map [String, Any]]]

...