Я выяснил, что на самом деле делает его медленным в этом случае.
Я поменял приемник FileIO с вопроса на рукописный с некоторым счетчиком времени, чтобы измерить стоимость каждого шага враковина.
Новая раковина находится здесь:
final class FileWriteSink extends GraphStage[SinkShape[Array[Char]]] {
private val in: Inlet[Array[Char]] = Inlet("ArrayOfCharInlet")
override def shape: SinkShape[Array[Char]] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// note that the operations to these vars below are not thread-safe
// but it is fairly enough to show the time differences in a large scale with a relatively low cost
private var count = 0L
private var grabTime = 0L
private var writeTime = 0L
private var pullTime = 0L
private var gapTime = 0L
private var counterTime = 0L
private var lastTime = 0L
private var currTime = System.nanoTime()
@inline private def timeDiff(): Long = {
lastTime = currTime
currTime = System.nanoTime()
currTime - lastTime
}
private val bw = Files.newBufferedWriter(Paths.get("test.xml"))
setHandler(in, new InHandler {
override def onPush(): Unit = {
gapTime += timeDiff()
count += 1
if (count % 1000000 == 0) {
println(s"count: $count, gapTime: $gapTime, counterTime: $counterTime, grabTime: $grabTime, writeTime: $writeTime, pullTime: $pullTime")
println(s"count: $count, gapTime-avg: ${gapTime / count}, counterTime-avg: ${counterTime / count}, grabTime-avg: ${grabTime / count}, writeTime-avg: ${writeTime / count}, pullTime-avg: ${pullTime / count}")
}
counterTime += timeDiff()
val v = grab(in)
grabTime += timeDiff()
bw.write(v)
writeTime += timeDiff()
pull(in)
pullTime += timeDiff()
}
})
override def preStart(): Unit = {
pull(in)
}
}
}
}
А потом я получил этот журнал из моей среды тестирования:
count: 1000000, gapTime: 3220562882, counterTime: 273008576, grabTime: 264956553, writeTime: 355040917, pullTime: 260033342
count: 1000000, gapTime-avg: 3220, counterTime-avg: 273, grabTime-avg: 264, writeTime-avg: 355, pullTime-avg: 260
count: 2000000, gapTime: 6307318517, counterTime: 549671865, grabTime: 532654603, writeTime: 708526613, pullTime: 524305026
count: 2000000, gapTime-avg: 3153, counterTime-avg: 274, grabTime-avg: 266, writeTime-avg: 354, pullTime-avg: 262
count: 3000000, gapTime: 9403004835, counterTime: 821901662, grabTime: 797670212, writeTime: 1054416804, pullTime: 786163401
count: 3000000, gapTime-avg: 3134, counterTime-avg: 273, grabTime-avg: 265, writeTime-avg: 351, pullTime-avg: 262
Оказывается, времяпромежуток между вызовом pull () и следующим onPush () очень медленный.
Даже если буфер заполнен, поэтому Sink не нужноподождите, пока источник сгенерирует следующий элемент.Между двумя вызовами onPush () в моей тестовой среде все еще остается промежуток времени почти в 3 мкс.
Так что я должен ожидать, что поток Akka будет иметь большую общую пропускную способность.В то время как промежуток времени между двумя вызовами onPush () необходимо тщательно учитывать и обрабатывать при проектировании структуры фактического потока.