Я получаю прерывистое пропущенное значение при повторных вызовах одного и того же (MongoDB) вызова базы данных, который я преобразовал в Observable. Я удалил весь код базы данных, чтобы получить минимальный контрольный пример, в котором есть только биты Monix, и я до сих пор иногда пропускаю значения - обычно один или два на 2000 тестов.
В соответствии с документами ConcurrentSubject означает «не нужно следовать контракту противодействия», но я получаю схожие ошибки независимо от того, делаю я это или нет.
import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import org.scalatest.FunSuite
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class Test_JustMonix extends FunSuite {
implicit val scheduler = monix.execution.Scheduler.global
def build(): Observable[Boolean] = {
val subject = ConcurrentSubject(MulticastStrategy.publish[Boolean])
subject.doAfterSubscribe {
Task.eval {
subject.onNext(true)
subject.onComplete()
}
}
}
test("just monix") {
(0 until 20).foreach { loop =>
println(s"loop $loop")
val tOpts = (0 until 100).map { _ => build().firstOptionL }
val tDone = Task.gather(tOpts).map { list =>
val emptyCount = list.count(_.isEmpty)
assert(emptyCount === 0)
}
Await.result(tDone.runToFuture, Duration.Inf)
}
println("Finished")
}
}
На некоторых запусках все циклы 20x100 завершаются правильно - firstOptionL isDefined для всех 2000 результатов. Однако более 50% времени срабатывает assert (emptyCount === 0), когда значение равно 1 или иногда 2, что указывает на то, что иногда я получаю значение None, как если бы onComplete происходило до onNext?
Это может произойти в любом из 20 циклов, поэтому похоже, что это либо состояние гонки, либо я неправильно понимаю требуемый ввод. Я перепробовал почти все темы - PublishSubject, с BufferedSubscriber и без него, и все они дают схожие результаты.
Я также пытался отложить onComplete до Ack через
subject.onNext(true).map(_=> subject.onComplete())
и это, похоже, провалится немного раньше.
Я также попробовал MulticastStrategy.replay без разницы.
Я использую Monix 3.0.0-RC3 на Scala 2.12.8.