Rx Java - Как проверить функцию с внутренним вызовом toBlocking - PullRequest
3 голосов
/ 31 марта 2020

Я пытаюсь добавить некоторые модульные тесты к следующему коду:

public List<Stuff> extractStuff(Scheduler scheduler, List<Token> tokens) {
    List<Observable<List<Stuff>>> observables = tokens
            .stream()
            .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
            .collect(Collectors.toList());

    List<Stuff> result = new ArrayList<>();
    for (List<Stuff> stuff: Observable.merge(observables).toBlocking().toIterable()) {
        result.addAll(stuff);
    }

    return result;
}

Я хочу, чтобы Stuff объекты выбирались с параллелизмом, но мне нужно собрать все из них перед тем, как далее (в противном случае следующий процесс не имеет никакого смысла).

Код работает, как и ожидалось, но я борюсь с юнит-тестами:

@Test
public void extractStuff() {
    // GIVEN
    TestScheduler scheduler = new TestScheduler();
    List<Token> tokens = buildTokens();
    ...
    // WHEN
    List<Stuff> result = this.instance.extractStuff(scheduler, tokens);
    // Execution never comes to this point...
    // THEN
    ...
}

Используя отладчик, я вижу, что Observable.just(...) выглядит хорошо (мой список наблюдаемых не пуст, и я могу видеть внутри себя мой смоделированный объект).

Проблема: выполнение, похоже, застряло в выражении Observable.merge(observables).toBlocking() , Строка result.addAll никогда не вызывается.

Я пробовал несколько вещей с объектом TestScheduler, но мне не удается заставить его работать. Большинство примеров, которые я нашел в Inte rnet, имеют дело с функцией, которая возвращает объект Observable, поэтому соответствующий модульный тест может запускать scheduler.advanceTimeBy(...);. В моей ситуации я не могу применить этот подход, так как Observable объект не возвращается напрямую.

Большое спасибо за вашу помощь!

1 Ответ

1 голос
/ 06 апреля 2020

Я не уверен, что опубликованный код ведет себя так, как вы ожидаете. Ваш метод getStuffByToken(...) фактически вызывается в вызывающем потоке, а не Scheduler.

. Для простоты позвольте мне заменить Token на Integer и Stuff на String.

My getStuffByToken(...) собирается вернуть String представление Integer и также будет включать текущее имя Thread:

private List<String> getStuffByToken( Integer token )
{
    return Arrays.asList( token.toString(), Thread.currentThread().getName() );
}

Я могу быть на другом версия Rx Java, у меня нет toBlocking(...) метода, но есть blockingIterable() - я надеюсь, что это эквивалентно:

public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
    List<Observable<List<String>>> observables = tokens
            .stream()
            .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
            .collect(Collectors.toList());

    List<String> result = new ArrayList<>();
    for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
        result.addAll(stuff);
    }

    return result;
}

Если мы протестируем выше:

@Test
public void testExtractStuff()
{
    List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
    List<String> result = extractStuff( Schedulers.computation(), tokens );
    System.out.println( result );
}

Мы возвращаемся:

[1, main, 3, main, 4, main, 5, main, 2, main]

Как вы можете сказать, все getStuffByToken(...) вызовы были выполнены на main Thread.

Далее, причина, по которой ваш тестовый метод не Это вызвано тем, что TestScheduler требует вызова TestScheduler.advanceTimeBy(...) для имитации времени, которое приводит к обработке вашего конвейера Rx. Поскольку ваш метод является методом блокировки, его тестирование с помощью TestScheduler.

будет не очень удобно. Вооружившись обеими данными выше, я предлагаю вам сделать что-то вроде этого:

public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
{
    return Observable.fromIterable( tokens )
        .flatMap( token -> Observable.just( token )
                .subscribeOn( scheduler )
                .map( this::getStuffByToken )
                .flatMap( Observable::fromIterable ))
        .toList();
}

Ваш производственный код может позвонить extractStuff(...).blockingGet() для разрешения List.

И вы можете проверить следующим образом:

@Test
public void testExtractStuff()
{
    TestScheduler scheduler = new TestScheduler();
    List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
    TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();

    scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
    test.assertValueCount( 1 );
    test.assertValue( list -> list.size() == 10 );
    test.assertComplete();
}
...