Я не думаю, что это так.Хотя я не могу объяснить это, модифицированная версия приложения с потоком, который предполагает высокую скорость, не дает желаемой высокой скорости.
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.Publisher
import concurrent.duration._
object StreamsPublisher extends App {
implicit private val actorSystem = ActorSystem()
private val value = ActorMaterializerSettings(actorSystem).withSupervisionStrategy{ex =>
ex.printStackTrace()
Supervision.Resume
}
private val materializer = ActorMaterializer(value)
implicit private val mater = materializer
val publisher: Publisher[Int] = Source
.fromIterator { () =>
println("new iterator created")
Stream.from(1).iterator
}
.runWith(Sink.asPublisher(true))
Source.fromPublisher(publisher).runWith(Sink.ignore)
Thread.sleep(2000)
Source
.fromPublisher(publisher)
.throttle(1, 2.seconds)
.runForeach(x => println(s"AAAAA: $x"))
Thread.sleep(3000)
Source
.fromPublisher(publisher)
.throttle(1, 4.seconds)
.runForeach(x => println (s"BBBBB: $x"))
Thread.sleep(5000)
Source
.fromPublisher(publisher)
.throttle(1, 6.seconds)
.runForeach(x => println(s"CCCCC: $x"))
Source
.fromPublisher(publisher)
.runForeach(x => println(s"DDDDDD: $x"))
}
«Быстрый» поток «DDDDD» работает не так быстро.
Выходные данные
new iterator created
AAAAA: 440617
AAAAA: 440618
BBBBB: 440633
AAAAA: 440619
AAAAA: 440620
BBBBB: 440634
CCCCC: 440633
DDDDDD: 440633
DDDDDD: 440634
DDDDDD: 440635
DDDDDD: 440636
DDDDDD: 440637
DDDDDD: 440638
DDDDDD: 440639
DDDDDD: 440640
DDDDDD: 440641
DDDDDD: 440642
DDDDDD: 440643
DDDDDD: 440644
DDDDDD: 440645
DDDDDD: 440646
DDDDDD: 440647
DDDDDD: 440648
AAAAA: 440621
AAAAA: 440622
BBBBB: 440635
AAAAA: 440623
DDDDDD: 440649
DDDDDD: 440650
DDDDDD: 440651
DDDDDD: 440652
DDDDDD: 440653
DDDDDD: 440654
DDDDDD: 440655
DDDDDD: 440656
AAAAA: 440624
CCCCC: 440634
BBBBB: 440636
AAAAA: 440625
AAAAA: 440626
BBBBB: 440637
AAAAA: 440627
CCCCC: 440635
AAAAA: 440628
BBBBB: 440638
AAAAA: 440629
AAAAA: 440630
CCCCC: 440636
BBBBB: 440639
AAAAA: 440631
DDDDDD: 440657
DDDDDD: 440658
DDDDDD: 440659
DDDDDD: 440660
DDDDDD: 440661
DDDDDD: 440662
DDDDDD: 440663
DDDDDD: 440664
AAAAA: 440632
BBBBB: 440640
AAAAA: 440633
CCCCC: 440637
Обратите внимание, что поток «игнорировать» был быстрым и итерировался до 440 КБ, прежде чем присоединялись дросселированные потоки.
Похоже, что существует буфер небольшого размера, от которого зависит газ.