Продвижение TestScheduler в RxJava для блокирующего вызова delay () - PullRequest
0 голосов
/ 19 сентября 2018

Использование RxJava2 У меня есть класс, который соединяет реактивный и нереактивный мир.В этом классе есть переопределенный метод , который возвращает некоторое значение после потенциально длительной обработки .Эта обработка встроена в реактивный мир с использованием Single.create(emitter -> ...).

Время обработки для меня крайне важно, так как есть разница в отношении бизнес-логики, будь то второй абонентпоявляется во время обработки или после того, как она завершена (или отменена).

Сейчас я тестирую этот класс - поэтому мне нужно, чтобы длительное время обработки было только виртуальным.RxJava TestScheduler на помощь!

Чтобы создать тестовую реализацию для этого базового класса моста, который эмулирует задержку, мне нужен блокирующий (!) Вызов в TestScheduler, где продвижение во временисрабатывает в отдельном потоке.(В противном случае продвижение во времени никогда не будет инициировано.)

Моя проблема в том, что Мне нужно, чтобы продвижение во времени было задержано, по крайней мере, до тех пор, пока «дорогие вычисления» не заблокируют - иначевремя получения изменено до , когда оператор задержки становится активным и, следовательно, он ждет, пока не наступит другое продвижение, которое никогда не произойдет.

Я могу решить эту проблему, позвонив по номеру Thread.sleep(100) до звонка, чтобы продвинуться во времени - но мне это кажется немного уродливым ... Как долго ждать?Для нескольких тестов это время складывается, но терпит неудачу из-за проблем с синхронизацией ... тьфу.

Любая идея, как проверить эту ситуацию, надеюсь, более чистым способом? Спасибо!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println("emitting on " + Thread.currentThread() + " ...");

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println("onSubcribe: completable"))
                       .blockingAwait();

            System.out.println("<< blocking released >>");

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess("FooBar!");

        });


        System.out.println("running the task ...");
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println("---SLEEPING---");
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println("---advancing time---");
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println("await...");
        testObserver.await();

        // assertions on task results, etc.
        System.out.println("finished. now do some testing... values (expected [FooBar!]): " + testObserver.values());

    }

}

[Примечание: вчера я задавал этот вопрос более сложным и многословным образом - но в моем кабинете было слишком жарко, и в нем было несколько недостатков и ошибок.Поэтому я удалил его, и теперь он более сжат к реальной проблеме.]

1 Ответ

0 голосов
/ 20 сентября 2018

Если я правильно понимаю вашу проблему, у вас есть долгосрочная служба, и вы хотите иметь возможность тестировать подписки на службы, которые будут затронуты вашей долгосрочной службой.Я подошел к этому следующим образом.

Определите ваш сервис так, как вы, используя TestScheduler, чтобы контролировать его продвижение во времени.Затем определите свои тестовые стимулы, используя тот же планировщик тестов.

Это эмулирует блокирующую сущность.Не используйте «блок».

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Вы должны увидеть, что происходит performTestAction1(), затем performTestAction2(), затем single заканчивается, затем performTestAction3().Все это управляется планировщиком испытаний.

...