Реактивный конвейер подписан, но в модульном тесте Mockito.verify () завершается неудачно (вызов mock не записывается) - PullRequest
0 голосов
/ 17 марта 2020

У меня есть этот класс:

@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
    private final SyncProducerService syncProducerService; // kafka producer
    private final CouponService couponService; // db persistence service
    private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka

    public void processChanges(List<Change> changes) {
        Flux.fromIterable(changes)
                    .map(this::processAndSend)
                    .doOnError(e -> log.error("Cannot sync coupon with change. ", e))
                    .subscribeOn(Schedulers.elastic())
                    .subscribe();
    }

    private Mono<CouponUpdateMessage> processAndSend(Change change) {
        return Mono.fromCallable(() -> change)
                .doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
                .map(this::saveOrDelete)
                .thenReturn(mapper.map(change))
                .doOnSuccess(message -> log.info("sending message: {}", message))
                .doOnSuccess(syncProducerService::send);
    }

    private Mono<Void> saveOrDelete(Change change) {
        if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
        else return couponService.saveCoupon(change.getChanged()).then();
    }

}

И этот тест:

@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
    @Mock
    private SyncProducerService syncProducerService;

    @Mock
    private CouponService couponService;

    @Mock
    private CouponUpdateMessageMapper mapper;

    @InjectMocks
    private SyncTransactionService syncTransactionService;


    private static Coupon insertId1;
    private static Coupon updateId2;
    private static Coupon deleteId3;

    private static Change change1;
    private static Change change2;
    private static Change change3;


    @BeforeAll
    private static void prepareData() {
        insertId1 = DataHelper.coupon();
        updateId2 = DataHelper.coupon();
        updateId2.setId(2);
        deleteId3 = DataHelper.coupon();
        deleteId3.setId(3);

        change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
        change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
        change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
    }

    @Test
    void shouldProcessChanges() {
        // given
        List<Change> changes = List.of(change1, change2, change3);
        when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
        when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
        when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
        doNothing().when(syncProducerService).send(any());
        doCallRealMethod().when(mapper).map(any());

        // when
        syncTransactionService.processChanges(changes);

        // then
        verify(couponService, times(2)).saveCoupon(any());
        verify(mapper, times(3)).map(any());
        verify(couponService).deleteCoupon(any());
        verify(syncProducerService, times(3)).send(any());
    }
}

Когда я запускаю тест, Mockito.verify() не обнаруживает никакого взаимодействия с макетами, хотя я в коде есть subscribe().

Так в чем же проблема в моем конвейере?

Ответы [ 2 ]

1 голос
/ 17 марта 2020

Проблема в том, что тестируемый метод выполняется асинхронно из-за указанного планировщика. Вы должны вернуть Flux из вашего тестируемого метода, а затем использовать StepVerifier или вызывать collectList() и block() методы Flux для запуска и ожидания выполнения.

0 голосов
/ 18 марта 2020

Точно так же, как сказал @Martin Tarjányi, если метод реагирования должен быть протестирован с использованием Schedulers.elastic(), он запустит асин c заданий, которые вы не можете немедленно завершить sh, и поэтому я не вижу никаких взаимодействий.

Если я придерживаюсь этого, я могу:

  • дождаться его окончания (используйте https://github.com/awaitility/awaitility lib или просто Thread.sleep(), например: Awaitility.waitAtMost(Duration.ofMillis(2000)).untilAsserted(() -> {verify(...);});)
  • или верните конвейер и проверьте его с помощью StepVerifier или block(). Помните, для потока, используйте blockLast(), чтобы получить все; blockFirst() испускает только первый элемент.

Вот так вот сейчас:

...
    public Flux<CouponUpdateMessage> processChanges(List<Change> changes) {
        return Flux.fromIterable(changes)
                .flatMap(this::processAndSend)
                .doOnError(e -> log.error("Cannot sync coupon with change. ", e))
                .subscribeOn(Schedulers.elastic()); // don't subscribe() here, but return it
    }
...

И тест:

...
   // when
   syncTransactionService.processChanges(changes).blockLast(); // process all elements

...

И Я вижу журналы, и все взаимодействия записываются как я: sh.


Если я не обязан использовать Schedulers.elastic(), я могу просто просто subscribe(), и тест в вопрос будет работать.

...