Spring WebFlux проект, который подключается к ElasticSearch Unit Testing - PullRequest
0 голосов
/ 05 июня 2018

В настоящее время я работаю над проектом Spring WebFlux, который подключается к ElasticSearch.У меня есть служба отдыха, которая, в свою очередь, вызывает метод на уровне сервиса, который подключается к ES.У меня проблемы с написанием UnitTests для моего сервисного уровня.Буду признателен за любую помощь, поскольку я впервые работаю с Reactive Programming.Ниже приведены фрагменты кода для методов Controller и Service.

Код контроллера:

@GetMapping(path = "/api/apis/services/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  Flux<ClassA> serviceApis(@PathVariable final String serviceKey) {
        return apiService.getDataForService(serviceKey);
  }

Уровень обслуживания:

@PreAuthorize("isFullyAuthenticated()")
    public Flux<ClassA> getDataForService(
            final String id) {

        IdentityToken token = GSLSecurityContext.getCurrentUserIdentityToken();
        if (token == null) {
            return Flux.error(new Exception("No token found"));
        }

        String securityQueryJson = getSecurityShould(token);

        String queryToRun = QUERY
                .replace("XXX_SIZE_XXX", config.getValueAsString("scroll.size"))
                .replace("XXX_SECURITY_SHOULD_XXX", securityQueryJson)
                .replace("XXX_SERVICE_KEY_XXX", id);

        WebClient client = ClientUtil.getDataLakeWebClient(config);
        Flux<ClassA> response = getData(client, queryToRun);
        return response;
    }

Код getData приведен ниже:

protected Flux<ClassA> getData(
        final WebClient client,
        final String queryToRun) {
    String scrollTimeoutQuery = "?scroll=" + config.getValueAsString("scroll.timeout");
    long timeout = config.getValueAsLong("query.timout");
    return Flux.generate(
        () -> null,
        (scrollId, sink) -> {
            ClassAWrapper lastWrapper = null;
            if (scrollId == null) {
                Mono<ClassAWrapper> wrapper = client.post()
                    .uri(getSearchURI() + scrollTimeoutQuery)
                    .body(BodyInserters.fromObject(queryToRun)).retrieve()
                    .bodyToMono(ClassAWrapper.class)
                    .onErrorMap(original -> new Exception("Unable to retrieve from elastic search for query " + queryToRun, original))
                    .log();
                try {
                    lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
                } catch (IllegalStateException ex) {
                    LOG.error("Timedout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
                    lastWrapper = null;
                } catch (Exception ex) {
                    LOG.error("Error in getting message details",ex);
                    lastWrapper = null;
                } 
            } else {
                String scrollQuery = "{\"scroll\" : \"" + config.getValueAsString("scroll.timeout") + "\", \"scroll_id\" : \"" + scrollId + "\"}";
                Mono<ClassAWrapper> wrapper = client.post()
                        .uri("_search/scroll")
                        .body(BodyInserters.fromObject(scrollQuery)).retrieve()
                        .bodyToMono(ClassAWrapper.class)
                        .onErrorMap(original -> new Exception("Unable to retrieve next page of data from elastic search", original))
                        .log();
                try {
                    lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
                } catch (IllegalStateException ex) {
                    LOG.error("Timeout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
                    lastWrapper = null;
                } catch (Exception ex) {
                    LOG.error("Error in getting message details",ex);
                    lastWrapper = null;
                } 
            }
            if (lastWrapper == null || lastWrapper.getResult() == null || lastWrapper.getResult().getDetails().isEmpty()) {
                sink.next(new ClassA());
                sink.complete();
                return null;
            }
            sink.next(lastWrapper.getResult());
            return lastWrapper.getScrollId();
        }
    );
}

Здесь queryToRun - это запрос ES, который должен быть выполнен.конфиг это конфигурация.Мне нужно проверить метод "getDataForService ()".

...