В настоящее время я работаю над проектом Spring WebFlux, который подключается к ElasticSearch.У меня есть служба отдыха, которая, в свою очередь, вызывает метод на уровне сервиса, который подключается к ES.У меня проблемы с написанием UnitTests для моего сервисного уровня.Буду признателен за любую помощь, поскольку я впервые работаю с Reactive Programming.Ниже приведены фрагменты кода для методов Controller и Service.
Код контроллера:
@GetMapping(path = "/api/apis/services/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<ClassA> serviceApis(@PathVariable final String serviceKey) {
return apiService.getDataForService(serviceKey);
}
Уровень обслуживания:
@PreAuthorize("isFullyAuthenticated()")
public Flux<ClassA> getDataForService(
final String id) {
IdentityToken token = GSLSecurityContext.getCurrentUserIdentityToken();
if (token == null) {
return Flux.error(new Exception("No token found"));
}
String securityQueryJson = getSecurityShould(token);
String queryToRun = QUERY
.replace("XXX_SIZE_XXX", config.getValueAsString("scroll.size"))
.replace("XXX_SECURITY_SHOULD_XXX", securityQueryJson)
.replace("XXX_SERVICE_KEY_XXX", id);
WebClient client = ClientUtil.getDataLakeWebClient(config);
Flux<ClassA> response = getData(client, queryToRun);
return response;
}
Код getData приведен ниже:
protected Flux<ClassA> getData(
final WebClient client,
final String queryToRun) {
String scrollTimeoutQuery = "?scroll=" + config.getValueAsString("scroll.timeout");
long timeout = config.getValueAsLong("query.timout");
return Flux.generate(
() -> null,
(scrollId, sink) -> {
ClassAWrapper lastWrapper = null;
if (scrollId == null) {
Mono<ClassAWrapper> wrapper = client.post()
.uri(getSearchURI() + scrollTimeoutQuery)
.body(BodyInserters.fromObject(queryToRun)).retrieve()
.bodyToMono(ClassAWrapper.class)
.onErrorMap(original -> new Exception("Unable to retrieve from elastic search for query " + queryToRun, original))
.log();
try {
lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
} catch (IllegalStateException ex) {
LOG.error("Timedout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
lastWrapper = null;
} catch (Exception ex) {
LOG.error("Error in getting message details",ex);
lastWrapper = null;
}
} else {
String scrollQuery = "{\"scroll\" : \"" + config.getValueAsString("scroll.timeout") + "\", \"scroll_id\" : \"" + scrollId + "\"}";
Mono<ClassAWrapper> wrapper = client.post()
.uri("_search/scroll")
.body(BodyInserters.fromObject(scrollQuery)).retrieve()
.bodyToMono(ClassAWrapper.class)
.onErrorMap(original -> new Exception("Unable to retrieve next page of data from elastic search", original))
.log();
try {
lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
} catch (IllegalStateException ex) {
LOG.error("Timeout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
lastWrapper = null;
} catch (Exception ex) {
LOG.error("Error in getting message details",ex);
lastWrapper = null;
}
}
if (lastWrapper == null || lastWrapper.getResult() == null || lastWrapper.getResult().getDetails().isEmpty()) {
sink.next(new ClassA());
sink.complete();
return null;
}
sink.next(lastWrapper.getResult());
return lastWrapper.getScrollId();
}
);
}
Здесь queryToRun - это запрос ES, который должен быть выполнен.конфиг это конфигурация.Мне нужно проверить метод "getDataForService ()".