Использование RxJava2 У меня есть класс, который соединяет реактивный и нереактивный мир.В этом классе есть переопределенный метод , который возвращает некоторое значение после потенциально длительной обработки .Эта обработка встроена в реактивный мир с использованием Single.create(emitter -> ...)
.
Время обработки для меня крайне важно, так как есть разница в отношении бизнес-логики, будь то второй абонентпоявляется во время обработки или после того, как она завершена (или отменена).
Сейчас я тестирую этот класс - поэтому мне нужно, чтобы длительное время обработки было только виртуальным.RxJava TestScheduler
на помощь!
Чтобы создать тестовую реализацию для этого базового класса моста, который эмулирует задержку, мне нужен блокирующий (!) Вызов в TestScheduler, где продвижение во временисрабатывает в отдельном потоке.(В противном случае продвижение во времени никогда не будет инициировано.)
Моя проблема в том, что Мне нужно, чтобы продвижение во времени было задержано, по крайней мере, до тех пор, пока «дорогие вычисления» не заблокируют - иначевремя получения изменено до , когда оператор задержки становится активным и, следовательно, он ждет, пока не наступит другое продвижение, которое никогда не произойдет.
Я могу решить эту проблему, позвонив по номеру Thread.sleep(100)
до звонка, чтобы продвинуться во времени - но мне это кажется немного уродливым ... Как долго ждать?Для нескольких тестов это время складывается, но терпит неудачу из-за проблем с синхронизацией ... тьфу.
Любая идея, как проверить эту ситуацию, надеюсь, более чистым способом? Спасибо!
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;
import java.util.concurrent.TimeUnit;
public class TestSchedulerBlocking {
public static void main(String[] args) throws Exception {
TestScheduler testScheduler = new TestScheduler();
// the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
Single<String> single = Single.create(emitter -> {
System.out.println("emitting on " + Thread.currentThread() + " ...");
// This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
// (e.g. method returns, or, in another case, an exception is thrown).
// Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
// Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
// Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
// I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
// Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
Completable.complete()
.delay(3, TimeUnit.SECONDS, testScheduler)
.doOnSubscribe(__ -> System.out.println("onSubcribe: completable"))
.blockingAwait();
System.out.println("<< blocking released >>");
// this has to be delayed! Might be, for example, a return or an exception in real world.
emitter.onSuccess("FooBar!");
});
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
.subscribeOn(Schedulers.newThread())
.test();
// Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
// and therefore the blockingAwait() blocks until infinity...
System.out.println("---SLEEPING---");
Thread.sleep(100);
// virtually advance the blocking delay
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
// wait for the task to complete
System.out.println("await...");
testObserver.await();
// assertions on task results, etc.
System.out.println("finished. now do some testing... values (expected [FooBar!]): " + testObserver.values());
}
}
[Примечание: вчера я задавал этот вопрос более сложным и многословным образом - но в моем кабинете было слишком жарко, и в нем было несколько недостатков и ошибок.Поэтому я удалил его, и теперь он более сжат к реальной проблеме.]