У меня есть класс, расширяющий FlowableTransformer,
Я пытаюсь использовать PublishSubject или PublishProcessor для запуска Observable с этим составом:
val transformerClass = TestTranformer()
val testTriger = PublishProcessor.create<Int>()
var testObservable = testTriger.filter {
// Called with the same code in instrumented Test but not here in Unit Test
println("filter $it")
it == 1}.map {
// Called with the same code in instrumented Test but not here in Unit Test
println("mapping")
EnterValueForTransformer() }.compose(transformerClass)
testTriger.subscribe()
// testSubscriber is a TestSubscriber user in Unit Test
testObservable.subscribe(testSubscriber)
testTriger.onNext(1)
testTriger.onNext(2)
Вот код TransformerКласс:
package com.example.myapplication.reproduce
import io.reactivex.Flowable
import io.reactivex.FlowableTransformer
import org.reactivestreams.Publisher
// A test class in a test Project
class TestTransformer : FlowableTransformer<TestTransformer.UpStreamRequest, TestTransformer.DownStreamResponse> {
val testClass = TestClass()
class UpStreamRequest
sealed class DownStreamResponse {
data class Example(testText : String) : DownStreamResponse()
}
override fun apply(upstream: Flowable<UpStreamRequest>): Publisher<DownStreamResponse> {
println("pass method apply")
// return a Single<DownStreamResponse>
return testClass.singleRequest("Test").toFlowable()
}
}
Ни вызывается метод filter (), ни map (), вызывается метод применения Transformer,
Когда я удаляю оператор compose (), фильтр и mapcall,
Похоже, это связано с модульным тестом, потому что в инструментальном тесте он работает нормально, я пытался установить планировщики с помощью класса RxJavaPlugin и даже пытался TestScheduler с задержкой,
Спасибо