Почему Monix Observable производит на один элемент больше, чем нужно - PullRequest
0 голосов
/ 25 ноября 2018

Я играю с потоками Monix и получил пример, где я строю Observable из Iterator.Мне кажется, что при запуске он производит на 1 элемент больше, чем я ожидал.Следующий код показывает, что:

  val count = AtomicLong(0)
  def produceValue(): Long = {
    count.transformAndGet { i =>
      logger.info(s"Producing value: ${i + 1}")
      i + 1
    }
  }
  def more(): Boolean = count.get < 20

  lazy val iter = new Iterator[Long] {
    override def hasNext: Boolean = more()
    override def next(): Long     = produceValue()
  }    

  Observable
    .fromIterator(iter)
    .mapParallelUnordered(5) { x =>
      Task(x)
        .foreachL { x =>
          logger.info(s"Transforming $x")
        }
        .delayExecution(3.seconds)
    }
    .consumeWith(Consumer.complete)
    .runAsync

Дело довольно простое.Iterator печатает журнал каждый раз, когда выдает значение next.Последующая стадия - это простая отложенная задача, выполняемая с параллельным счетом 5, чтобы увидеть, что происходит.Теперь выходные данные выглядят следующим образом:

[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] -  Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] -  Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] -  Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] -  Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] -  Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] -  Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] -  Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] -  Transforming 17

Как вы можете видеть, первоначально поток производит 6 элементов, в то время как я ожидал бы только 5 (так как нисходящая ступень mapParallelUnordered занимает всего 5 элементов. На самом деле это не так.большое дело, но я просто хочу понять, почему это так.

Кроме того, почему начальные значения создаются в потоке main, а последующие вызываются в execution-context пуле потоков?используя планировщик, который используется для запуска всего потока?

1 Ответ

0 голосов
/ 26 ноября 2018

Как видите, изначально поток генерирует 6 элементов

Протокол низкоуровневой связи разработан вокруг Subscriber и его (унаследованного) метода onNext со следующей сигнатурой:

def onNext(elem: A): Future[Ack]

(источник)

Если мы представляем создание и преобразование, каждое из которых представляет собой стадию, наблюдаемый источник (fromIterator в вашем случае) толкает его значение к подписчикам и, когда подтверждается, толкает следующее.

Итак, что происходит:

  • fromIterator stage генерирует значение 1
  • значение 1 передается на стадию mapAsyncUnordered, где оно принимается (т.к. есть свободные работники), поэтому необходимо немедленно подтвердить Continue
  • Вышеуказанные шаги повторяются для значений 2-5
  • fromIterator ступень генерирует значение 6 (это когда вы видите выходной сигнал)
  • значение 6 перемещается на mapAsyncUnordered ступень.На этот раз он не может быть принят немедленно, поэтому подтверждение должно быть Continue через некоторое время.До этого момента fromIterator.

не генерирует больше значений. Стоит отметить, что не стадия mapAsyncUnordered извлекает значение из fromIterator, а fromIterator генерирует этизначения сами по себе, и он не может знать заранее, примет ли преобразование в нисходящем направлении значение немедленно или нет.


Разве все не должны использовать планировщик, который используется для запуска всего потока?

Monix Observable пытается работать максимально синхронно из соображений производительности (переключение потоков стоит дорого).В общем, если это не контролируется явно такими методами, как executeAsync, executeOn и т. Д., Вы не сможете определить, будет ли операция выполняться в том же потоке или нет.

...