Я не уверен, что опубликованный код ведет себя так, как вы ожидаете. Ваш метод getStuffByToken(...)
фактически вызывается в вызывающем потоке, а не Scheduler
.
. Для простоты позвольте мне заменить Token
на Integer
и Stuff
на String
.
My getStuffByToken(...)
собирается вернуть String
представление Integer
и также будет включать текущее имя Thread
:
private List<String> getStuffByToken( Integer token )
{
return Arrays.asList( token.toString(), Thread.currentThread().getName() );
}
Я могу быть на другом версия Rx Java, у меня нет toBlocking(...)
метода, но есть blockingIterable()
- я надеюсь, что это эквивалентно:
public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
List<Observable<List<String>>> observables = tokens
.stream()
.map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
.collect(Collectors.toList());
List<String> result = new ArrayList<>();
for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
result.addAll(stuff);
}
return result;
}
Если мы протестируем выше:
@Test
public void testExtractStuff()
{
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
List<String> result = extractStuff( Schedulers.computation(), tokens );
System.out.println( result );
}
Мы возвращаемся:
[1, main, 3, main, 4, main, 5, main, 2, main]
Как вы можете сказать, все getStuffByToken(...)
вызовы были выполнены на main Thread
.
Далее, причина, по которой ваш тестовый метод не Это вызвано тем, что TestScheduler
требует вызова TestScheduler.advanceTimeBy(...)
для имитации времени, которое приводит к обработке вашего конвейера Rx. Поскольку ваш метод является методом блокировки, его тестирование с помощью TestScheduler
.
будет не очень удобно. Вооружившись обеими данными выше, я предлагаю вам сделать что-то вроде этого:
public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
{
return Observable.fromIterable( tokens )
.flatMap( token -> Observable.just( token )
.subscribeOn( scheduler )
.map( this::getStuffByToken )
.flatMap( Observable::fromIterable ))
.toList();
}
Ваш производственный код может позвонить extractStuff(...).blockingGet()
для разрешения List
.
И вы можете проверить следующим образом:
@Test
public void testExtractStuff()
{
TestScheduler scheduler = new TestScheduler();
List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();
scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
test.assertValueCount( 1 );
test.assertValue( list -> list.size() == 10 );
test.assertComplete();
}