Как оптимизировать пропускную способность для вызовов REST API в akka-потоке - PullRequest
0 голосов
/ 30 марта 2019

В настоящее время я изучаю потоки akka и пытаюсь реализовать простой поток, который получает элемент из источника и вызывает REST API для каждого элемента.

Упрощенная версия моего кода выглядит так:

source.mapAsync(parallelism){ item =>
    Http().singleRequest(HttpRequest(HttpMethods.GET, "http://myserver:8080/$item"))
        .flatMap(response => response.entity.toStrict(20.seconds))
}

Мне интересно, как установить parallelism для получения максимальной пропускной способности, если узким местом является REST-сервер.

Насколько я понимаю, если мы предположим, что сервер не может обрабатывать запрос параллельно, я мог бы использовать parallelism = 1 в первом приближении и мой поток будет отправлять один запрос за другим. Если сервер может обрабатывать n запросов параллельно, поток не будет использовать это, и мне придется установить parallelism = n, чтобы получить лучшая пропускная способность.

Теперь я могу поиграть с parallelism, чтобы оптимизировать пропускную способность для моей установки в данный момент времени. Однако myserver может быть за балансировщиком нагрузки и автоматическим масштабированием. Тогда количество параллельных запросов, доступных для потока, будет зависеть от времени и, возможно, моя программа не единственный пользователь REST API.

Теперь мой вопрос: каков наилучший подход для вызова REST API, когда я хочу использовать myserver настолько хорошо, насколько это возможно, но не хотите сокрушить это, если оно находится под давлением или еще не увеличено?

Ответы [ 3 ]

0 голосов
/ 31 марта 2019

На самом деле, это своего рода встроенная функция для потоков, так как они используют противодавление.Поэтому, как только остальной сервер достигнет предела, ответы будут занимать больше времени, и ваш поток будет запрашивать меньше запросов от источника.Таким образом, вам на самом деле не нужно оптимизировать параллелизм для текущего состояния запрошенного сервера, а настроить его для хорошей пропускной способности.Для любого запрошенного сервера это будет настолько быстро, насколько ваш запрошенный сервер сможет ответить.С помощью аналогичного Graph, использованного выше, я смог довести кластер из 2 узлов до его пределов, а также 16 узлов с масштабированием, встроенных в кластер, поэтому график работал чисто для обоих состояний, только пропускная способность значительно возросла:).

Вы должны быть в порядке, установив уровень параллелизма на количество ядер, которое использует ваша машина.

Вы также можете использовать .throttle(...) на вашем источнике, чтобы установить максимальное числозапросов, которые вы хотите предоставить за единицу времени.

В случае, если вы хотите динамически реагировать на максимальное количество запросов, на которые вам отвечает служба, а затем, скажем, ограничить себя до 80%, вам придетсякод какой-то обычай для этого я думаю.Например, Стадия потока, которая подсчитывает запросы в течение некоторого времени, затем соответствующим образом регулирует скорость и делает это снова через некоторое время в случае увеличения обслуживания и т. Д.Но, наконец, обработка запросов на пользователя должна быть на уровне запрашиваемой услуги.

0 голосов
/ 01 апреля 2019

Существует более «оптимальный» способ сделать запросы, чем продемонстрировано в вопросе.

Если вы используете API-интерфейс уровня клиента на уровне клиента , вы можете открыть пул соединений для интересующей вас службы:

val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
  Http().outgoingConnection("http://myserver:8080")

Этот пул может затем использоваться для обработки ваших item запросов:

type Item = ???

val itemToRequest : Item => HttpRequest = 
  item => HttpRequest(uri = URI(item.toString))

source
  .map(itemToRequest)
  .via(connectionFlow)
  .flatMap(_.entity.toStrict(20.seconds))

Затем можно использовать настройки конфигурации akka , чтобы указать максимальное количество открытых запросов для myserver:

host-connection-pool {
  max-open-requests = 8
}
0 голосов
/ 30 марта 2019

Таким образом, метод mapAsync(parallelism) принадлежит object source, и у вас есть несколько вложенных функций - давайте сделаем несколько простых функций вместо одной большой вложенной, и она должна быть оптимизирована.

...