Вот код RxJava, который я хочу протестировать:
public void triggerCancelOrderJob() {
couchConnector()
.findAbandonedOpenOrders()
.flatMap(results -> results.rows())
.flatMap(
row ->
Observable.just(row)
.subscribeOn(Schedulers.io())
.map(
s -> return s.value())
.flatMap(
orderId -> {
return RxReactiveStreams.toObservable(
serviceTokenCache
.get(OrderApiConstants.SERVICE_TOKEN_CACHE_KEY)
.flatMap(
issueToken -> {
return cancelOrderApiConnector()
.invokeAPI(
RequestInputModel.builder().build(),
RequestInputModel.RequestBodyModel.builder().build());
}));
}))
.subscribe(//additional code)
Итак, я запускаю асинхронный запрос CB, получаю Observable< AsyncN1qlQueryResult >
, затем для каждой строки, которую я вызываю, вызываю два внешнихуслуги один за другим (первый звонок на serviceTokenCache
и второй звонок на cancelOrderApiConnector
).Каждая строка работает в отдельном потоке IO
.
Примечание: serviceTokenCache.get()
и cancelOrderApiConnector().invokeAPI()
возвращают Mono
соответственно.
Я не могу понять, как проверить этот код.Какие все компоненты необходимо проверить?Поскольку каждая строка будет выполняться в отдельном потоке, я не могу понять, как тестировать такой асинхронный код.