Акка График с дросселирующими сообщениями - PullRequest
0 голосов
/ 23 сентября 2019

Я пытаюсь рассмотреть дроссельные сообщения с использованием потоков akka.

Что касается меня, самая простая идея для этого - прочитать строки потока из файла, обработать его и передать целевой службе.

Итакследующий код охватывает идею (source ~> lines ~> sink):

import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{
  Actor,
  ActorLogging,
  ActorRef,
  ActorSystem,
  OneForOneStrategy,
  Props,
  SupervisorStrategy
}
import akka.stream.scaladsl.{
  FileIO,
  Flow,
  Framing,
  GraphDSL,
  RunnableGraph,
  Sink
}
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode}
import akka.util.ByteString

import scala.concurrent.duration.Duration
import scala.language.postfixOps

class StreamSenderActorV1(target: ActorRef) extends Actor with ActorLogging {
  override def receive: Receive = {
    case StreamBookV1(filename) =>
      val materializer = ActorMaterializer.create(context) // Materializing and running a stream always requires a Materializer to be in implicit scope.
      val sink: Sink[Any, NotUsed] = Sink.actorRef(target, NotUsed)

      import java.nio.file.Paths
      val source = FileIO.fromPath(Paths.get(filename))

      val lines: Flow[ByteString, StreamLineV1, NotUsed] = Framing
        .delimiter(
          ByteString(System.lineSeparator),
          10000,
          allowTruncation = true
        )
        .map(bs => bs.utf8String)
        .map(StreamLineV1)

      lines
        .throttle(1, Duration(2, "seconds"), 1, ThrottleMode.shaping) // throttle - to slow down the stream to 1 element per second.
        .to(sink)

      RunnableGraph
        .fromGraph(GraphDSL.create() {
          implicit builder: GraphDSL.Builder[NotUsed] =>
            import akka.stream.scaladsl.GraphDSL.Implicits._
            source ~> lines ~> sink
            ClosedShape
        })
        .run()(materializer)
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Restart
    }
}
class StreamReceiverActorV1 extends Actor with ActorLogging {
  override def receive: Receive = {
    case StreamLineV1(line) => log.info(s"line: $line")
    case x                  => log info s"all: $x"
  }
}

case class StreamBookV1(fileName: String)
case class StreamLineV1(line: String)

object StreamSenderActorV1 {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("Thottler-Messages")
    val target = system.actorOf(Props[StreamReceiverActorV1], "receiver")
    val sender =
      system.actorOf(Props(classOf[StreamSenderActor], target), "sender")

    sender ! StreamBookV1("throttle-streams/src/main/resources/log.txt")

    Thread sleep 4000
    system terminate
  }
}

к сожалению, результат отличается от ожиданий (задержка отсутствует):

[INFO] [09/23/2019 14:11:22.414] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 1.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 2.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 3.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 4.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 5.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 6.
[INFO] [09/23/2019 14:11:22.415] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 7.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 8.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 9.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-5] [akka://Thottler-Messages/user/receiver] all: 10.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-3] [akka://Thottler-Messages/user/receiver] all: 11.
[INFO] [09/23/2019 14:11:22.416] [Thottler-Messages-akka.actor.default-dispatcher-3] [akka://Thottler-Messages/user/receiver] all: NotUsed

Вопрос

Какиспользовать throttle в Graph потоке?

PS

Я пытался применить throttle с source напрямую, результат тот же.

PS2.

Примеркорректировка более простых потоков , предоставляемых akka, работает для меня корректно.

Возможно throttle может быть бесполезным в моем случае из-за применения его для Flow, в котором уже задано правило интеграции для Graph (~> lines ~> sink).... Понятия не имею, как исправить, если для Graph.

Среда:

  • scala 2.13.0
  • sbt 1.3.0
  • akka 2.5.25

1 Ответ

2 голосов
/ 23 сентября 2019

Я думаю, что коренная причина здесь в том, что throttle "применяется" к lines как мутатор - это не так, как это работает :) В основном все эти операторы в потоках (map, filter, throttle, via, mapConcat и т. Д.) Создают только неизменный план вычисления.Таким образом, грубо говоря, каждая операция создает отдельную «копию» проекта, но никогда не изменяет исходную.

Итак, в вашем случае

lines
   .throttle(1, Duration(2, "seconds"), 1, ThrottleMode.shaping)
   .to(sink)

возвращает новый ограниченный «план», а затем сразу отбрасывается.

Итак, способ исправить это просто - используйтечто вернул throttled.Один из вариантов:

val lines = ... // same as now
val throttledLines = lines.throttled(...)

RunnableGraph
        .fromGraph(GraphDSL.create() {
          implicit builder: GraphDSL.Builder[NotUsed] =>
            import akka.stream.scaladsl.GraphDSL.Implicits._
            source ~> throttledLines ~> sink
            ClosedShape
        })
        .run()(materializer)

Однако, поскольку ваш график «линейный», вы можете использовать более элегантный «плавный» интерфейс:

val source = FileIO.fromPath(Paths.get(filename))
val lines = ... // same as now

val runnableStream = source.via(lines).throttle(...).to(sink)
runnableStream.run()(materializer)
...