У меня есть микро-сервис, который читает объекты из базы данных, используя интерфейс ReactiveMongoRepository
.
Цель состоит в том, чтобы взять каждый из этих объектов и передать его в лямбда-функцию AWS (после преобразования его в DTO).Если результат этой лямбда-функции находится в диапазоне 200, отметьте объект как успешный, иначе проигнорируйте.
В старые времена простого хранилища Монго и шаблона RestTemplate это было бы тривиальной задачей.Тем не менее, я пытаюсь понять эту сделку и избегать блокировок.
Вот код, который я придумал, я знаю, что блокирую на webClient
, но как мне этого избежать?
@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}
Я вызываю вышеупомянутое из запланированной задачи, и меня не волнует фактический результат метода index (), только то, что происходит во время.
@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}
Я прочитал кучу постов в блогах и т. Д. По этой теме, но все они - просто простые операции CRUD, но в середине ничего не происходит, так что не дайте мне полной картины того, какреализовать эти вещи.
Любая помощь?