Как я могу перенаправить сообщения актеру IO (Tcp) в Akka? - PullRequest
0 голосов
/ 21 марта 2019

У меня есть такой актер

class TcpClientActor(target: Target) extends Actor with Logger {

  override def preStart(): Unit = {
    self ! TestConnection
  }

  override def receive: Receive = {
    case TestConnection =>
      IO(Tcp) ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)

    case failed@CommandFailed(_: Connect) =>
      info(s"Failure: $target.endpoint:$target.port")
      shutdown()

    case Connected(_, _) =>
      info(s"Success: $target.endpoint:$target.port")
      sender() ! Close
      shutdown()

  }

  def shutdown(): Unit = {
    context stop self
  }
}

Я перебираю файл с конечными точками для проверки и создаю одного из этих акторов с каждой строкой в ​​качестве аргумента конструктора типа Target.Я хочу иметь возможность регулировать количество параллельных TCP-соединений для инициализации до некоторого заданного числа, есть ли встроенные механизмы, которые я могу использовать в Akka, чтобы гарантировать, что я не перегружу систему, просто сразу создав TcpClientActor для каждоголиния ввода и выбивание разъема подключения?

1 Ответ

1 голос
/ 28 марта 2019

Я бы использовал Akka Stream для регулирования количества сообщений

import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.{ ActorMaterializer, OverflowStrategy, ThrottleMode }
import akka.stream.scaladsl.{ Sink, Source }

object TcpThrottle {
  def throttler(ratePerSecond: Int, burstRate: Option[Int], bufferSize: Int = 1000)(implicit materializer: ActorMaterializer): ActorRef =
    Source.actorRef(bufferSize = bufferSize, OverflowStrategy.dropNew)
      .throttle(ratePerSecond, 1.second, burstRate.getOrElse(ratePerSecond), ThrottleMode.Shaping)
      .to(Sink.actorRef(IO(Tcp), NotUsed)
      .run()
 }

class TcpClientActor(target: Target) extends Actor with Logger {
  val throttler = TcpThrottle.throttler(1, Some(5))

  // otherwise identical

  def receive: Receive = {
    case TestConnection => throttler ! Connect(remoteAddress = new InetSocketAddress(target.endpoint, target.port), localAddress = None, options = Nil, timeout = Some(timeout), pullMode = false)

    // other cases identical
  }
}

Адаптировано с Руководство по миграции Akka 2.5 .Это может быть необходимо

...