Неблокирующие функциональные методы с Reactive Mongo и веб-клиентом - PullRequest
0 голосов
/ 21 января 2019

У меня есть микро-сервис, который читает объекты из базы данных, используя интерфейс ReactiveMongoRepository.

Цель состоит в том, чтобы взять каждый из этих объектов и передать его в лямбда-функцию AWS (после преобразования его в DTO).Если результат этой лямбда-функции находится в диапазоне 200, отметьте объект как успешный, иначе проигнорируйте.

В старые времена простого хранилища Монго и шаблона RestTemplate это было бы тривиальной задачей.Тем не менее, я пытаюсь понять эту сделку и избегать блокировок.

Вот код, который я придумал, я знаю, что блокирую на webClient, но как мне этого избежать?

@Override
public Flux<Video> index() {
    return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        // Blocking call
        final HttpStatus httpStatus = webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO)).exchange()
                .block()
                .statusCode();

        if (httpStatus.is2xxSuccessful()) {
            video.setIndexed(true);
        }

        return videoRepository.save(video);
    });
}

Я вызываю вышеупомянутое из запланированной задачи, и меня не волнует фактический результат метода index (), только то, что происходит во время.

@Scheduled(fixedDelay = 60000)
public void indexTask() {
    indexService
            .index()
            .log()
            .subscribe();
}

Я прочитал кучу постов в блогах и т. Д. По этой теме, но все они - просто простые операции CRUD, но в середине ничего не происходит, так что не дайте мне полной картины того, какреализовать эти вещи.

Любая помощь?

Ответы [ 2 ]

0 голосов
/ 21 января 2019

мой подход, может быть, немного более читабельным.Но я признаю, что не запускал его, поэтому нет 100% гарантии, что он будет работать.

public Flux<Video> index() {
    return videoRepository.findAll()
        .flatMap(this::callLambda)
        .flatMap(videoRepository::save);
}

private Mono<Video> callLambda(final Video video) {
    SearchDTO searchDTO = new SearchDTO(video);
    return webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO))
            .exchange()
            .map(ClientResponse::statusCode)
            .filter(HttpStatus::is2xxSuccessful)
            .map(t -> {
                video.setIndexed(true);
                return video;
            });
}
0 голосов
/ 21 января 2019

Ваше решение на самом деле довольно близко.В этих случаях вы должны попытаться поэтапно разложить реактивную цепочку и без колебаний превращать биты в независимые методы для ясности.

@Override
public Flux<Video> index() {

    Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
    return unindexedVideos.flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        Mono<ClientResponse> indexedResponse = webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO)).exchange()
            .filter(res -> res.statusCode().is2xxSuccessful());

        return indexedResponse.flatMap(response -> {
            video.setIndexed(true);
            return videoRepository.save(video);
        });
    });
...