monix: Task.executeWithFork предотвращает выполнение? - PullRequest
0 голосов
/ 18 января 2019

Я не могу понять, почему добавление executeWithFork не позволяет запускать задачу в следующем примере:

import java.util.concurrent.TimeUnit

import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject

object Sandbox {

  def main(args: Array[String]): Unit = {
    implicit val scheduler: SchedulerService =
      monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())

    val input = ConcurrentSubject.publish[String]

    // prints nothing
    input.foreachL(println).executeWithFork.runAsync
    // this works:
    // input.foreachL(println).runAsync

    input.onNext("one")
    input.onNext("two")

    scheduler.shutdown()
    scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
  }
}

1 Ответ

0 голосов
/ 19 января 2019

Поведение, которое вы видите, является результатом двух фактов:

  1. с использованием executeWithFork вводит немного дополнительной задержки для переключения потоков

  2. вы используете ConcurrentSubject.publish (в отличие от replay, например).Если вы откроете документы для PublishSubject, вы можете увидеть, что

A PublishSubject передает подписчику только те элементы, которые испускаютсяисточник после времени подписки.

Другими словами, у вас есть условие состязания между основным потоком, который публикует "one" и "two", и разветвленным потоком, который должен подписаться наinput чтобы получить данные.Результаты зависят от того, какой поток выиграет гонку: все данные, опубликованные до подписки, будут потеряны.Одно мое оборудование я почти всегда вижу "two" и очень редко даже "one", ваши результаты могут отличаться.

Самый простой способ проверить это - добавить Thread.sleep(100) перед первым input.onNext, и вы должны увидеть оба события, напечатанные каждый раз.Вы также можете попытаться выдвинуть больше событий, чем просто 2, чтобы увидеть, что не все потеряно.

...