Как реализовать наследование планировщика, когда другой источник врезается в конвейер? - PullRequest
0 голосов
/ 24 сентября 2018

Я хотел бы реализовать «наследование планировщика» как часть API, использующего RxJava2.Я хочу, чтобы потребители моего API могли мыслить с точки зрения построения единой цепочки обработки, а не DAG, хотя внутренне новые события включаются в подробности реализации.

Я нелюбой способ сделать эквивалент:

observable
.flatMap {
  val scheduler = Schedulers().current!!
  someOtherObservable
    .observeOn(scheduler)
}

Есть ли другой способ наследовать планировщик?

Больше контекста

У меня есть конвейер, подобный:

compositeDisposable += Environment
  .lookupDeviceInfo()
  .subscribeOn(scheduler)
  .flatMap { deviceInfo ->
    Device(deviceId = deviceInfo.id)
      .sendCommand()
  .subscribe(
    { result -> /*process result*/ },
    { e -> /*log error*/ })

Для потребителя это выглядит так, как будто они перенесли всю работу на указанный scheduler: события из lookupDeviceInfo() передаются работнику из этого планировщика, и они ожидают прилипания к этому работнику.

На практике у них есть ошибка, потому что sendCommand() показывает события из другого источника событий в качестве детали реализации:

sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
  responseObservable
  .filter { it.messageId == sentMessageId }
  .firstOrError()
}

Поток событий из responseObservable, но ни один из нихсобытия направляются в указанный scheduler, потому что он применяется в восходящем направлении.

1 Ответ

0 голосов
/ 03 октября 2018

Из комментариев:

Для возврата к одному и тому же потоку планировщика необходимо предоставить однопоточный планировщик (т. Е. Schedulers.from(Executor), Schedulers.single() и т. Д.).Нет текущего планировщика, потому что нет гарантии, что какой-либо код будет работать на любом из стандартных планировщиков;они могут выполняться в произвольных потоках системы, других платформах и т. д. Таким образом, вы должны перенаправить сигналы обратно в нужный поток через observeOn.

Меня не беспокоит посадкав том же потоке, точно такой же планировщик.(Даже смена Workers может подойти, если новый работник работает с тем же планировщиком, что и старый.)

Тогда это предложение остается в силе, и вы можете отказаться от свойства «однопоточный»Я упоминал.

...