Точно так же, как сказал @Martin Tarjányi, если метод реагирования должен быть протестирован с использованием Schedulers.elastic()
, он запустит асин c заданий, которые вы не можете немедленно завершить sh, и поэтому я не вижу никаких взаимодействий.
Если я придерживаюсь этого, я могу:
- дождаться его окончания (используйте
https://github.com/awaitility/awaitility
lib или просто Thread.sleep()
, например: Awaitility.waitAtMost(Duration.ofMillis(2000)).untilAsserted(() -> {verify(...);});
) - или верните конвейер и проверьте его с помощью
StepVerifier
или block()
. Помните, для потока, используйте blockLast()
, чтобы получить все; blockFirst()
испускает только первый элемент.
Вот так вот сейчас:
...
public Flux<CouponUpdateMessage> processChanges(List<Change> changes) {
return Flux.fromIterable(changes)
.flatMap(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic()); // don't subscribe() here, but return it
}
...
И тест:
...
// when
syncTransactionService.processChanges(changes).blockLast(); // process all elements
...
И Я вижу журналы, и все взаимодействия записываются как я: sh.
Если я не обязан использовать Schedulers.elastic()
, я могу просто просто subscribe()
, и тест в вопрос будет работать.