Я пытаюсь рассмотреть дроссельные сообщения с использованием потоков akka.
Что касается меня, самая простая идея для этого - прочитать строки потока из файла, обработать его и передать целевой службе.
Итакследующий код охватывает идею (source ~> lines ~> sink
):
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{
Actor,
ActorLogging,
ActorRef,
ActorSystem,
OneForOneStrategy,
Props,
SupervisorStrategy
}
import akka.stream.scaladsl.{
FileIO,
Flow,
Framing,
GraphDSL,
RunnableGraph,
Sink
}
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode}
import akka.util.ByteString
import scala.concurrent.duration.Duration
import scala.language.postfixOps
class StreamSenderActorV1(target: ActorRef) extends Actor with ActorLogging {
override def receive: Receive = {
case StreamBookV1(filename) =>
val materializer = ActorMaterializer.create(context) // Materializing and running a stream always requires a Materializer to be in implicit scope.
val sink: Sink[Any, NotUsed] = Sink.actorRef(target, NotUsed)
import java.nio.file.Paths
val source = FileIO.fromPath(Paths.get(filename))
val lines: Flow[ByteString, StreamLineV1, NotUsed] = Framing
.delimiter(
ByteString(System.lineSeparator),
10000,
allowTruncation = true
)
.map(bs => bs.utf8String)
.map(StreamLineV1)
lines
.throttle(1, Duration(2, "seconds"), 1, ThrottleMode.shaping) // throttle - to slow down the stream to 1 element per second.
.to(sink)
RunnableGraph
.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
import akka.stream.scaladsl.GraphDSL.Implicits._
source ~> lines ~> sink
ClosedShape
})
.run()(materializer)
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Restart
}
}
class StreamReceiverActorV1 extends Actor with ActorLogging {
override def receive: Receive = {
case StreamLineV1(line) => log.info(s"line: $line")
case x => log info s"all: $x"
}
}
case class StreamBookV1(fileName: String)
case class StreamLineV1(line: String)
object StreamSenderActorV1 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Thottler-Messages")
val target = system.actorOf(Props[StreamReceiverActorV1], "receiver")
val sender =
system.actorOf(Props(classOf[StreamSenderActor], target), "sender")
sender ! StreamBookV1("throttle-streams/src/main/resources/log.txt")
Thread sleep 4000
system terminate
}
}
к сожалению, результат отличается от ожиданий (задержка отсутствует):
[INFO] [09/23/2019 14:11:22.414] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 1.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 2.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 3.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 4.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 5.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 6.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 7.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 8.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 9.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 10.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-3] [akka://Thottler-Messages/user/receiver] all: 11.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-3] [akka://Thottler-Messages/user/receiver] all: NotUsed
Вопрос
Какиспользовать throttle
в Graph
потоке?
PS
Я пытался применить throttle
с source
напрямую, результат тот же.
PS2.
Примеркорректировка более простых потоков , предоставляемых akka, работает для меня корректно.
Возможно throttle
может быть бесполезным в моем случае из-за применения его для Flow
, в котором уже задано правило интеграции для Graph (~> lines ~> sink
).... Понятия не имею, как исправить, если для Graph.
Среда:
scala 2.13.0
sbt 1.3.0
akka 2.5.25