Я получаю противоречивые результаты от Monix firstOptionL - состояние гонки? - PullRequest
1 голос
/ 28 июня 2019

Я получаю прерывистое пропущенное значение при повторных вызовах одного и того же (MongoDB) вызова базы данных, который я преобразовал в Observable. Я удалил весь код базы данных, чтобы получить минимальный контрольный пример, в котором есть только биты Monix, и я до сих пор иногда пропускаю значения - обычно один или два на 2000 тестов.

В соответствии с документами ConcurrentSubject означает «не нужно следовать контракту противодействия», но я получаю схожие ошибки независимо от того, делаю я это или нет.

import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import org.scalatest.FunSuite

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class Test_JustMonix extends FunSuite {

  implicit val scheduler = monix.execution.Scheduler.global

  def build(): Observable[Boolean] = {
    val subject = ConcurrentSubject(MulticastStrategy.publish[Boolean])
    subject.doAfterSubscribe {
      Task.eval {
        subject.onNext(true)
        subject.onComplete()
      }
    }
  }

  test("just monix") {
    (0 until 20).foreach { loop =>
      println(s"loop $loop")
      val tOpts = (0 until 100).map { _ => build().firstOptionL }
      val tDone = Task.gather(tOpts).map { list =>
        val emptyCount = list.count(_.isEmpty)
        assert(emptyCount === 0)
      }
      Await.result(tDone.runToFuture, Duration.Inf)
    }
    println("Finished")
  }
}

На некоторых запусках все циклы 20x100 завершаются правильно - firstOptionL isDefined для всех 2000 результатов. Однако более 50% времени срабатывает assert (emptyCount === 0), когда значение равно 1 или иногда 2, что указывает на то, что иногда я получаю значение None, как если бы onComplete происходило до onNext?

Это может произойти в любом из 20 циклов, поэтому похоже, что это либо состояние гонки, либо я неправильно понимаю требуемый ввод. Я перепробовал почти все темы - PublishSubject, с BufferedSubscriber и без него, и все они дают схожие результаты.

Я также пытался отложить onComplete до Ack через

subject.onNext(true).map(_=> subject.onComplete())

и это, похоже, провалится немного раньше.

Я также попробовал MulticastStrategy.replay без разницы.

Я использую Monix 3.0.0-RC3 на Scala 2.12.8.

...