как Monix использует противодавление с оператором flatMap? - PullRequest
4 голосов
/ 25 апреля 2019

Monix использует Ack для синхронизации отправляемых сообщений, но если я использую groupBy и flatMap, внутренняя наблюдаемая не будет следовать обратному давлению из source.

См. Этот тестовый код, пожалуйста:

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
  @Test
  def test(): Unit = {
    val source = Observable.range(0,130)

    val backPressuredStream = source.map(x => {
        println("simple log first  map - " + x)
        x
      })
      .asyncBoundary(OverflowStrategy.BackPressure(5))
      .map { i =>

        println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
        ((i % 3) toString) -> i
      }
      .groupBy{case (k, v) => k}
      .flatMap(x => {
        val mapWithSleep = x.map{case groupedMsg@(key, value) =>
          Thread.sleep(2000)
          println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
          groupedMsg
        }

        mapWithSleep

      })

    backPressuredStream.share.subscribe(
      (keyAndValue: (String, Long)) => Continue
    )

    global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
      println("========sleep 1 second ============")
    })

    Thread.currentThread().join()

  }

}

выход:

...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

, которые появляются с некоторым противодавлением, не соответствующим:
после: sleep 2 second for every message ... противодавление даеттри элемента after backpressure map - ...

как sleep 2 second for every message ... может иметь отношение один к одному с after backpressure map - ... с точки зрения противодавления?

и еще одно удивление: почему журнал sleep 2 second for every message выводит (0, 72), (0, 75), (0,78), но такие вещи (0, 72), (1, 73), (2,74)?

спасибо.

Monix версия: "io.monix" %% "monix" % "3.0.0-RC1"

1 Ответ

2 голосов
/ 26 апреля 2019

Поведение, которое вы видите, именно то, что вы можете ожидать.

Чтобы кратко изложить, что делает ваше приложение, позвольте мне объяснить это словами:


У вас есть Observable для генерации чисел и создания побочного эффекта для каждого элемента.

Затем вы группируете элементы по _ % 3.

Далее вы делаете еще несколько побочных эффектов (спите и пишете в консоль) внутри каждой группы Observable.

Затем вы flatMap каждой группы Observable, в результате чего получается один плоский Observable.


Так почему же вы вначале видите только первую группу (где _ % 3 == 0), печатающую материал для консоли? ***

Ответ лежит в flatMap: при просмотре документации для Observable вы найдете следующее описание для flatMap:

final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

Думайте о Observable с, как если бы вы думали о List с в секунду: когда вы объединяете List с, вы получите один List, содержащий сначала элементы первого List далее следуют элементы второго List и т. д.

В Monix такое же поведение достигается для Observable, ожидая, пока первая Observable, полученная внутри операции flatMap (читай: concatMap), отправит сигнал «выполнено». Только тогда будет потреблен второй Observable и т. Д.

Или, проще говоря, flatMap заботится о последовательности произведенных Observable с.

Но когда Observable s в вашей flatMap операции завершается? Для этого мы должны понять, как работает groupBy, потому что именно там они и появились.

Чтобы groupBy работал, хотя Observable s лениво оценивается, он должен хранить входящие элементы в буфере. Я не уверен на 100% в этом, но если groupBy работает так, как я думаю, то для любой сгруппированной Observable, которая тянет следующий элемент, будет проходить исходный Observable бесконечно, пока не найдет принадлежащий элемент в эту группу, сохраняя все предыдущие (но пока не обязательные) элементы, принадлежащие другим группам, в этом буфере для последующего использования.

Все это означает, что groupBy не может знать, были ли найдены все элементы группы до тех пор, пока источник Observable не сообщит о завершении, тогда он будет использовать все оставшиеся буферизованные элементы и затем сообщит о завершении группированным Observable s. .

Проще говоря: Observable s, генерируемые groupBy, не завершаются, пока не завершится источник Observable.

Когда вы соберете всю эту информацию вместе, вы поймете, что только после того, как источник Observable (ваш Observable.range(0, 130)) был завершен, первая группа Observable также будет завершена, и из-за flatMap только тогда все остальные будут сгруппированы Observable s.

Поскольку из вашего последнего вопроса я знаю, что вы пытаетесь построить веб-сокет, использование flatMap является плохой идеей - ваш источник Observable входящих запросов не будет никогда завершен, эффективно только обслуживая самый первый IP-адрес, с которым вы столкнетесь.

Вместо этого вы должны использовать mergeMap. При сравнении с concatMap mergeMap не заботится о последовательности элементов, вместо этого применяется правило «первым пришел - первым обслужен». .


***: Когда вы дойдете до конца моего объяснения и, надеюсь, поймете, как работают groupBy и flatMap, вы поймете, почему я написал «в начале»!

...