Ограничить количество сообщений, отправляемых в течение интервала времени - PullRequest
0 голосов
/ 21 сентября 2019

Используя приведенный ниже код, я пытаюсь ограничить количество сообщений, отправляемых актеру в течение указанного периода времени.Но сообщения не блокируются и отправляются как можно быстрее.Нижестоящий актер просто отправляет http-запрос на главную страницу Google.

Код регулирования, в котором я пытаюсь ограничить отправку 3 сообщений в течение 3 секунд:

  val throttler: ActorRef =
    Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
      .throttle(3, 1.second)
      .to(Sink.actorRef(printer, NotUsed))
      .run()

Как я могу ограничитьколичество сообщений, отправленных в цикле:

  for( a <- 1 to 10000){

    // Create the 'greeter' actors
    val howdyGreeter: ActorRef =
      system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))

    howdyGreeter ! RequestActor("RequestActor")
    howdyGreeter ! Greet
  }

до 3 в секунду?

весь код:

//https://developer.lightbend.com/guides/akka-quickstart-scala/full-example.html

import akka.NotUsed
import akka.stream.{OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.http.client.methods.HttpGet
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import net.liftweb.json._
import net.liftweb.json.Serialization.write
import org.apache.http.util.EntityUtils
//import akka.contrib.throttle.TimerBasedThrottler
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

object Greeter {
  def props(message: String, printerActor: ActorRef): Props = Props(new Greeter(message, printerActor))
  final case class RequestActor(who: String)
  case object Greet
}

class Greeter(message: String, printerActor: ActorRef) extends Actor {
  import Greeter._
  import Printer._

  var greeting = ""

  def receive = {
    case RequestActor(who) =>

      val get = new HttpGet("http://www.google.com")
      val response = (new DefaultHttpClient).execute(get)
//      val responseString = EntityUtils.toString(response.getEntity, "UTF-8")
//      System.out.println(responseString)

      greeting = String.valueOf(response.getStatusLine.getStatusCode)
      println("message is "+message)
//      greeting = message + ", " + who
    case Greet           =>
      printerActor ! Greeting(greeting)
  }
}

object Printer {
  def props: Props = Props[Printer]
  final case class Greeting(greeting: String)
}

class Printer extends Actor with ActorLogging {
  import Printer._

  def receive = {
    case Greeting(greeting) =>
      log.info("Greeting received (from " + sender() + "): " + greeting)
  }
}

object AkkaQuickstart extends App {
  import Greeter._
  // Create the 'helloAkka' actor system
  val system: ActorSystem = ActorSystem("helloAkka")

  // Create the printer actor,this is also the target actor
  val printer: ActorRef = system.actorOf(Printer.props, "printerActor")

  implicit val materializer = ActorMaterializer.create(system)

  val throttler: ActorRef =
    Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
      .throttle(3, 1.second)
      .to(Sink.actorRef(printer, NotUsed))
      .run()

  //Create a new actor for each request thread
  for( a <- 1 to 10000){

    // Create the 'greeter' actors
    val howdyGreeter: ActorRef =
      system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))

    howdyGreeter ! RequestActor("RequestActor")
    howdyGreeter ! Greet
  }

}

Ответы [ 2 ]

2 голосов
/ 22 сентября 2019

Актер не может влиять на то, что делают другие актеры, в частности он не контролирует, кто и когда помещает сообщения в свой почтовый ящик, - так работает модель актера.Актер только решает, что делать с сообщениями, которые он находит в своем почтовом ящике, и над этим он имеет полный контроль.Например, он может отбрасывать их, отправлять ответы об ошибках, буферизовать их и т. Д.

Если вы хотите, чтобы удушение и обратное давление, я рекомендую вообще не использовать Actors для этой части, а использовать только потоки Akka.Код, который генерирует ваши сообщения запроса, должен быть источником, а не циклом for.Какой источник является наиболее подходящим, полностью зависит от вашего реального варианта использования, например, создание потока из строгой коллекции с Source.from() или асинхронное извлечение новых элементов из структуры данных с помощью Source.unfoldAsync плюс многие другие.Это гарантирует, что запросы будут отправляться только в подходящее время в соответствии с нисходящей пропускной способностью или регулированием скорости.

1 голос
/ 21 сентября 2019

Мне не кажется, что вы на самом деле используете регулятор:

  val throttler: ActorRef =
    Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
      .throttle(3, 1.second)
      .to(Sink.actorRef(printer, NotUsed))
      .run()

Но я не вижу сообщений, отправляемых на throttler в вашем коде: throttler будет толькосообщения газа, отправленные на throttler.

...