Flink увеличивает параллельность асинхронной операции - PullRequest
0 голосов
/ 04 июля 2019

У нас есть AsyncFunction, асинхронная операция выполняется с использованием akka http client

class Foo[A,B] extends AsyncFunction[A, B] with {
  val akkaConfig = ConfigFactory.load()
  implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
  implicit lazy val system = ActorSystem("MyActorSystem", akkaConfig)
  implicit lazy val materializer = ActorMaterializer()
    def postReq(uriStr: String, str: String): Future[HttpResponse] = {
        Http().singleRequest(HttpRequest(
          method = HttpMethods.POST,
          uri = uriStr,
          entity = HttpEntity(ContentTypes.`application/json`, str))
        )
      }

 override def asyncInvoke(input: A, resultFuture: ResultFuture[B]) : Unit  = {
    val resultFutureRequested: Future[HttpResponse] = postReq(...)
//the rest of the class ...

Вопросы:

  1. Если я хочу увеличить параллелизм http-запросов - я должен сделать это, используя конфигурацию akka, или есть способ настроить это через flink.yamel
  2. Поскольку Flink также использует akka, это правильный способ создания ActorSystem и ExecutionContext?

1 Ответ

1 голос
/ 06 июля 2019

Что касается первого вопроса, у вас есть три различных параметра, которые могут повлиять на производительность и количество фактически выполненных запросов:

  1. Параллелизм, это заставит Flink создать несколько экземпляров Вашего AsyncFunction, включая несколько экземпляров Вашего HttpClient.
  2. Количество одновременных запросов в самой функции. Когда вы звоните orderedWait или unorderedWait Вы должны указать в функции capacity, что ограничит количество одновременных запросов.
  3. Актуальные настройки Вашего Http-клиента.

Как видите, пункты 2. и 3. связаны, так как Flink может ограничивать количество возможных одновременных запросов, поэтому иногда изменения в настройках вашего Http-клиента могут не повлиять, так как количество запросов ограничен Флинком.

Увеличение пропускной способности Вашего AsyncFunction зависит от случая. Вы должны помнить, что AsyncFunction вызывается в ОДНОЙ РЕЗЬБЕ. По сути, это означает, что если время ответа службы, которую вы вызываете, велико, вы просто заблокируете количество запросов, ожидающих ответа, и, таким образом, единственный способ - увеличить parallelism'. Однако, как правило, изменение настроек HttpClient и capacity функции должно позволить вам повысить пропускную способность.

Что касается второго вопроса, я не вижу проблемы с созданием кратного ActorSystems. Вы можете увидеть ответ на аналогичный вопрос [здесь]. 1

...