Я хотел бы реализовать «наследование планировщика» как часть API, использующего RxJava2.Я хочу, чтобы потребители моего API могли мыслить с точки зрения построения единой цепочки обработки, а не DAG, хотя внутренне новые события включаются в подробности реализации.
Я нелюбой способ сделать эквивалент:
observable
.flatMap {
val scheduler = Schedulers().current!!
someOtherObservable
.observeOn(scheduler)
}
Есть ли другой способ наследовать планировщик?
Больше контекста
У меня есть конвейер, подобный:
compositeDisposable += Environment
.lookupDeviceInfo()
.subscribeOn(scheduler)
.flatMap { deviceInfo ->
Device(deviceId = deviceInfo.id)
.sendCommand()
.subscribe(
{ result -> /*process result*/ },
{ e -> /*log error*/ })
Для потребителя это выглядит так, как будто они перенесли всю работу на указанный scheduler
: события из lookupDeviceInfo()
передаются работнику из этого планировщика, и они ожидают прилипания к этому работнику.
На практике у них есть ошибка, потому что sendCommand()
показывает события из другого источника событий в качестве детали реализации:
sendMessageSingle(deviceId, payload)
.flatMap { sentMessageId ->
responseObservable
.filter { it.messageId == sentMessageId }
.firstOrError()
}
Поток событий из responseObservable
, но ни один из нихсобытия направляются в указанный scheduler
, потому что он применяется в восходящем направлении.