Поток Akka слишком медленный по сравнению с двумя потоками с ArrayBlockingQueue - PullRequest
0 голосов
/ 15 мая 2018

Я работаю над проблемой производительности, которая возникла при переключении текущего проекта в поток Akka.

После упрощения проблем кажется, что поток Akka передавал гораздо меньше сообщений, чем я ожидал.

Здесь у меня есть два очень простых фрагмента кода, каждый из которых просто записывает 10 байт времени в файл на диске.

Первый использует два потока и связывает их ArrayBlockingQueue:

val bw = Files.newBufferedWriter(Paths.get("test.txt"))
val target = "0123456789".toCharArray
val abq = new ArrayBlockingQueue[Array[Char]](10000)

new Thread(new Runnable {
  override def run(): Unit = {
    while (true) {
      bw.write(abq.take())
    }
  }
}).start()

while (true) {
  abq.put(target)
}

Второй использует поток Akka:

implicit val system: ActorSystem = ActorSystem("TestActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Source & Sink runs in two actors
// Both output of Source & input of Sink were buffered
Source
  .repeat(ByteString("0123456789"))
  .buffer(8192, OverflowStrategy.backpressure)
  .async
  .runWith(
    FileIO
      .toPath(Paths.get("test.txt"))
      .withAttributes(Attributes.inputBuffer(8192, 8192))
  )

И я обнаружил, что первый записывает файл со скоростью 27,4 МБ / с, а второй - только со скоростью 3,4.МБ / с на моей тестовой машине.Поток с массивомBlockingQueue один был в 8 раз быстрее, чем Akka.

Я попытался изменить Sink с FileIO на рукописный Sink, который пишет в BufferedWriter.Это позволило скорости второго увеличить до 5,5 МБ / с, но все еще в 5 раз медленнее, чем первый.

В моем понимании поток Akka имел бы гораздо лучшую производительность по сравнению с тем, который он достигает сейчас.

Что-то, что я сделал, было неправильно в этом сценарии?

1 Ответ

0 голосов
/ 17 мая 2018

Я выяснил, что на самом деле делает его медленным в этом случае.

Я поменял приемник FileIO с вопроса на рукописный с некоторым счетчиком времени, чтобы измерить стоимость каждого шага враковина.

Новая раковина находится здесь:

final class FileWriteSink extends GraphStage[SinkShape[Array[Char]]] {

  private val in: Inlet[Array[Char]] = Inlet("ArrayOfCharInlet")

  override def shape: SinkShape[Array[Char]] = SinkShape.of(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      // note that the operations to these vars below are not thread-safe
      // but it is fairly enough to show the time differences in a large scale with a relatively low cost
      private var count = 0L

      private var grabTime = 0L
      private var writeTime = 0L
      private var pullTime = 0L
      private var gapTime = 0L
      private var counterTime = 0L

      private var lastTime = 0L
      private var currTime = System.nanoTime()

      @inline private def timeDiff(): Long = {
        lastTime = currTime
        currTime = System.nanoTime()
        currTime - lastTime
      }

      private val bw = Files.newBufferedWriter(Paths.get("test.xml"))
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          gapTime += timeDiff()
          count += 1
          if (count % 1000000 == 0) {
            println(s"count: $count, gapTime: $gapTime, counterTime: $counterTime, grabTime: $grabTime, writeTime: $writeTime, pullTime: $pullTime")
            println(s"count: $count, gapTime-avg: ${gapTime / count}, counterTime-avg: ${counterTime / count}, grabTime-avg: ${grabTime / count}, writeTime-avg: ${writeTime / count}, pullTime-avg: ${pullTime / count}")
          }
          counterTime += timeDiff()
          val v = grab(in)
          grabTime += timeDiff()
          bw.write(v)
          writeTime += timeDiff()
          pull(in)
          pullTime += timeDiff()
        }
      })

      override def preStart(): Unit = {
        pull(in)
      }
    }
  }

}

А потом я получил этот журнал из моей среды тестирования:

count: 1000000, gapTime: 3220562882, counterTime: 273008576, grabTime: 264956553, writeTime: 355040917, pullTime: 260033342
count: 1000000, gapTime-avg: 3220, counterTime-avg: 273, grabTime-avg: 264, writeTime-avg: 355, pullTime-avg: 260
count: 2000000, gapTime: 6307318517, counterTime: 549671865, grabTime: 532654603, writeTime: 708526613, pullTime: 524305026
count: 2000000, gapTime-avg: 3153, counterTime-avg: 274, grabTime-avg: 266, writeTime-avg: 354, pullTime-avg: 262
count: 3000000, gapTime: 9403004835, counterTime: 821901662, grabTime: 797670212, writeTime: 1054416804, pullTime: 786163401
count: 3000000, gapTime-avg: 3134, counterTime-avg: 273, grabTime-avg: 265, writeTime-avg: 351, pullTime-avg: 262

Оказывается, времяпромежуток между вызовом pull () и следующим onPush () очень медленный.

Даже если буфер заполнен, поэтому Sink не нужноподождите, пока источник сгенерирует следующий элемент.Между двумя вызовами onPush () в моей тестовой среде все еще остается промежуток времени почти в 3 мкс.

Так что я должен ожидать, что поток Akka будет иметь большую общую пропускную способность.В то время как промежуток времени между двумя вызовами onPush () необходимо тщательно учитывать и обрабатывать при проектировании структуры фактического потока.

...