Используя приведенный ниже код, я пытаюсь ограничить количество сообщений, отправляемых актеру в течение указанного периода времени.Но сообщения не блокируются и отправляются как можно быстрее.Нижестоящий актер просто отправляет http-запрос на главную страницу Google.
Код регулирования, в котором я пытаюсь ограничить отправку 3 сообщений в течение 3 секунд:
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(3, 1.second)
.to(Sink.actorRef(printer, NotUsed))
.run()
Как я могу ограничитьколичество сообщений, отправленных в цикле:
for( a <- 1 to 10000){
// Create the 'greeter' actors
val howdyGreeter: ActorRef =
system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))
howdyGreeter ! RequestActor("RequestActor")
howdyGreeter ! Greet
}
до 3 в секунду?
весь код:
//https://developer.lightbend.com/guides/akka-quickstart-scala/full-example.html
import akka.NotUsed
import akka.stream.{OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.http.client.methods.HttpGet
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import net.liftweb.json._
import net.liftweb.json.Serialization.write
import org.apache.http.util.EntityUtils
//import akka.contrib.throttle.TimerBasedThrottler
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Greeter {
def props(message: String, printerActor: ActorRef): Props = Props(new Greeter(message, printerActor))
final case class RequestActor(who: String)
case object Greet
}
class Greeter(message: String, printerActor: ActorRef) extends Actor {
import Greeter._
import Printer._
var greeting = ""
def receive = {
case RequestActor(who) =>
val get = new HttpGet("http://www.google.com")
val response = (new DefaultHttpClient).execute(get)
// val responseString = EntityUtils.toString(response.getEntity, "UTF-8")
// System.out.println(responseString)
greeting = String.valueOf(response.getStatusLine.getStatusCode)
println("message is "+message)
// greeting = message + ", " + who
case Greet =>
printerActor ! Greeting(greeting)
}
}
object Printer {
def props: Props = Props[Printer]
final case class Greeting(greeting: String)
}
class Printer extends Actor with ActorLogging {
import Printer._
def receive = {
case Greeting(greeting) =>
log.info("Greeting received (from " + sender() + "): " + greeting)
}
}
object AkkaQuickstart extends App {
import Greeter._
// Create the 'helloAkka' actor system
val system: ActorSystem = ActorSystem("helloAkka")
// Create the printer actor,this is also the target actor
val printer: ActorRef = system.actorOf(Printer.props, "printerActor")
implicit val materializer = ActorMaterializer.create(system)
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(3, 1.second)
.to(Sink.actorRef(printer, NotUsed))
.run()
//Create a new actor for each request thread
for( a <- 1 to 10000){
// Create the 'greeter' actors
val howdyGreeter: ActorRef =
system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))
howdyGreeter ! RequestActor("RequestActor")
howdyGreeter ! Greet
}
}