Понимание Акка Диспетчер - PullRequest
0 голосов
/ 20 октября 2018

У меня есть пара вопросов на основе приведенного ниже примера кода.

1) Я указал

akka.cluster.use-dispatcher = cluster-dispatcher in my config. 

Когда я ставлю точку останова на этой строке в Frontend.scala,

   _frontend = system.actorOf(Props[Frontend],
        name = "frontend")

Я вижу диспетчера по умолчанию внутри объекта "_frontend".Почему он не получил кластер-диспетчер из конфигурации?

2) Я хочу смоделировать сценарий блокировки, о котором говорится в этой документации.https://doc.akka.io/docs/akka/2.5/dispatchers.html#problem-blocking-on-default-dispatcher Я попытался установить диспетчер по умолчанию

default-dispatcher {
  fork-join-executor {
    parallelism-min = 1
    parallelism-max = 1
    throughput = 1
  }
}

И я подумал, что один "прием" в бэкэнде будет обрабатываться одновременно.Прежде всего, я снова отлаживаю объект "_frontend" и не думаю, что он читает по умолчанию.Во-вторых, если у вас есть несколько действующих лиц, работающих в разных удаленных процессах, что означает, что все действующие лица совместно используют один и тот же диспетчер по умолчанию, и что блокирование задач может вызвать истощение потока?Если актеры работают в разных процессах, не думаете ли они, что у каждого из них есть свой пул потоков?Суть в том, что, если вы можете дать мне пример или изменить ниже, что я могу создать сценарий истощения потоков, то я могу лучше понять, о чем идет речь.Спасибо за вашу помощь.
Благодать

  akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
    //    default-dispatcher {
    //      fork-join-executor {
    //        parallelism-min = 1
    //        parallelism-max = 1
    //        throughput = 1
    //      }
    //    }
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }

      akka.cluster.use-dispatcher = cluster-dispatcher

      cluster-dispatcher {
        type = "Dispatcher"
        executor = "fork-join-executor"
        fork-join-executor {
          parallelism-min = 1
          parallelism-max = 1
        }
      }


      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]

        auto-down-unreachable-after = 10s
      }
    }

    akka.cluster.min-nr-of-members = 3


    akka.cluster.role {
      frontend.min-nr-of-members = 1
      backend.min-nr-of-members = 2
    }

    akka.actor.deployment {
      /frontend/backendRouter {
        # Router type provided by metrics extension.
        router = adaptive-group
        # Router parameter specific for metrics extension.
        # metrics-selector = heap
        # metrics-selector = load
        # metrics-selector = cpu
        metrics-selector = mix
        #
        nr-of-instances = 100
        routees.paths = ["/user/backend"]
        cluster {
          enabled = on
          use-role = backend
          allow-local-routees = off
        }
      }
    }

===========================

package com.packt.akka.loadBalancing

import com.packt.akka.commons.Add

object LoadBalancingApp extends App {

//initiate three nodes from backend
Backend.initiate(2551)

Backend.initiate(2552)

Backend.initiate(2561)

//initiate frontend node
Frontend.initiate()

Thread.sleep(10000)

Frontend.getFrontend ! Add(2, 4)

}

=============================

package com.packt.akka.loadBalancing

import akka.cluster._
import com.packt.akka.commons._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.MemberUp
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, RootActorPath }

class Backend extends Actor {

  def receive = {
    case Add(num1, num2) =>
      println(s"I'm a backend with path: ${self} and I received add operation.")
      Thread.sleep(60000)
      println(s"I'm a backend with path: ${self} and I am done with add operation.")
  }

}

object Backend {
  def initiate(port: Int){
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load("loadbalancer"))

    val system = ActorSystem("ClusterSystem", config)

    val Backend = system.actorOf(Props[Backend], name = "backend")

    Backend
  }
}

=====================

    package com.packt.akka.loadBalancing

    import com.packt.akka.commons._
    import scala.concurrent.duration._
    import com.typesafe.config.ConfigFactory
    import akka.actor.{ Actor, ActorRef, ActorSystem, Props } 
    import akka.cluster.Cluster
    import akka.routing.FromConfig
    import akka.actor.ReceiveTimeout
    import scala.util.Random


    class Frontend extends Actor {
      import context.dispatcher

      val backend = context.actorOf(FromConfig.props(), name = "backendRouter")

      context.system.scheduler.schedule(3.seconds, 3.seconds, self,
        Add(Random.nextInt(100), Random.nextInt(100)))

      def receive = {
        case addOp: Add =>
          println("Frontend: I'll forward add operation to backend node to handle it.")
          backend forward addOp

      }

    }


object Frontend {

  private var _frontend: ActorRef = _ 

  val upToN = 200

  def initiate() = {
    val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
      withFallback(ConfigFactory.load("loadbalancer"))

    val system = ActorSystem("ClusterSystem", config)
    system.log.info("Frontend will start when 2 backend members in the cluster.")
    //#registerOnUp
    Cluster(system) registerOnMemberUp {
      _frontend = system.actorOf(Props[Frontend],
        name = "frontend")
    }
    //#registerOnUp

  }

  def getFrontend = _frontend
}

1 Ответ

0 голосов
/ 02 ноября 2018

1) См. Документацию akka.cluster.use-dispatcher = cluster-dispatcher in my config. в reference.conf :

# The id of the dispatcher to use for cluster actors. If not specified
# default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher

Этот параметр позволяет вам настроить то, что диспетчер используется для «внутренних» акторов кластера,не для ваших собственных актеров.

2) Параметр parallelism-max для ForkJoinPool не ограничивает количество реальных потоков.Как объяснено в примечании в документации :

Обратите внимание, что параллелизм-max не устанавливает верхнюю границу общего числа потоков, выделенных ForkJoinPool.Это параметр, который конкретно говорит о количестве горячих потоков, которые пул продолжает работать, чтобы уменьшить задержку обработки новой входящей задачи.Вы можете прочитать больше о параллелизме в документации JDK ForkJoinPool .

Вы правы в том, что когда актеры работают в разных процессах JVM, у них есть отдельные диспетчеры.

Если вы хотите провести эксперимент и увидеть проблемы с истощением потоков в действии, самый простой подход - создать актера, который использует блокирующие вызовы (например, Thread.sleep) при обработке сообщений.Теперь приступайте к созданию множества экземпляров этого актера и отправляйте им сообщения.Вы увидите, что ваша программа продвигается очень медленно.

В отличие от этого, если вы пишете тот же актер, но выполняете «отложенные вычисления» с помощью планировщика, а не с Thread.sleep, вы должны увидеть гораздо лучшую производительность.

...