У меня есть пара вопросов на основе приведенного ниже примера кода.
1) Я указал
akka.cluster.use-dispatcher = cluster-dispatcher in my config.
Когда я ставлю точку останова на этой строке в Frontend.scala,
_frontend = system.actorOf(Props[Frontend],
name = "frontend")
Я вижу диспетчера по умолчанию внутри объекта "_frontend".Почему он не получил кластер-диспетчер из конфигурации?
2) Я хочу смоделировать сценарий блокировки, о котором говорится в этой документации.https://doc.akka.io/docs/akka/2.5/dispatchers.html#problem-blocking-on-default-dispatcher Я попытался установить диспетчер по умолчанию
default-dispatcher {
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
throughput = 1
}
}
И я подумал, что один "прием" в бэкэнде будет обрабатываться одновременно.Прежде всего, я снова отлаживаю объект "_frontend" и не думаю, что он читает по умолчанию.Во-вторых, если у вас есть несколько действующих лиц, работающих в разных удаленных процессах, что означает, что все действующие лица совместно используют один и тот же диспетчер по умолчанию, и что блокирование задач может вызвать истощение потока?Если актеры работают в разных процессах, не думаете ли они, что у каждого из них есть свой пул потоков?Суть в том, что, если вы можете дать мне пример или изменить ниже, что я могу создать сценарий истощения потоков, то я могу лучше понять, о чем идет речь.Спасибо за вашу помощь.
Благодать
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
// default-dispatcher {
// fork-join-executor {
// parallelism-min = 1
// parallelism-max = 1
// throughput = 1
// }
// }
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
akka.cluster.use-dispatcher = cluster-dispatcher
cluster-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-max = 1
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
akka.cluster.min-nr-of-members = 3
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
akka.actor.deployment {
/frontend/backendRouter {
# Router type provided by metrics extension.
router = adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
nr-of-instances = 100
routees.paths = ["/user/backend"]
cluster {
enabled = on
use-role = backend
allow-local-routees = off
}
}
}
===========================
package com.packt.akka.loadBalancing
import com.packt.akka.commons.Add
object LoadBalancingApp extends App {
//initiate three nodes from backend
Backend.initiate(2551)
Backend.initiate(2552)
Backend.initiate(2561)
//initiate frontend node
Frontend.initiate()
Thread.sleep(10000)
Frontend.getFrontend ! Add(2, 4)
}
=============================
package com.packt.akka.loadBalancing
import akka.cluster._
import com.packt.akka.commons._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.MemberUp
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, RootActorPath }
class Backend extends Actor {
def receive = {
case Add(num1, num2) =>
println(s"I'm a backend with path: ${self} and I received add operation.")
Thread.sleep(60000)
println(s"I'm a backend with path: ${self} and I am done with add operation.")
}
}
object Backend {
def initiate(port: Int){
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("loadbalancer"))
val system = ActorSystem("ClusterSystem", config)
val Backend = system.actorOf(Props[Backend], name = "backend")
Backend
}
}
=====================
package com.packt.akka.loadBalancing
import com.packt.akka.commons._
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.cluster.Cluster
import akka.routing.FromConfig
import akka.actor.ReceiveTimeout
import scala.util.Random
class Frontend extends Actor {
import context.dispatcher
val backend = context.actorOf(FromConfig.props(), name = "backendRouter")
context.system.scheduler.schedule(3.seconds, 3.seconds, self,
Add(Random.nextInt(100), Random.nextInt(100)))
def receive = {
case addOp: Add =>
println("Frontend: I'll forward add operation to backend node to handle it.")
backend forward addOp
}
}
object Frontend {
private var _frontend: ActorRef = _
val upToN = 200
def initiate() = {
val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
withFallback(ConfigFactory.load("loadbalancer"))
val system = ActorSystem("ClusterSystem", config)
system.log.info("Frontend will start when 2 backend members in the cluster.")
//#registerOnUp
Cluster(system) registerOnMemberUp {
_frontend = system.actorOf(Props[Frontend],
name = "frontend")
}
//#registerOnUp
}
def getFrontend = _frontend
}