Сохраняйте поток записей, используя данные пружины elasticsearch в реактивном режиме - PullRequest
0 голосов
/ 09 июля 2020

Пожалуйста, помогите мне найти правильный способ сохранить поток сущностей, полученных от Кассандры, в Elasticserch.

Например, у меня огромный поток сущностей:

Flux<User> user = cassandraService.getUsers();

Как я могу сохранить весь поток пользователей в Elasticsearch реактивным способом? Какой метод вызова Elasticksearch лучше использовать:

  • user.map (u-> reactiveElasticsearchOperations.save (u, INDEX_NAME));
  • user.flatMap (u-> reactiveElasticsearchOperations .save (u, INDEX_NAME));
  • user.onNext (u-> reactiveElasticsearchOperations.save (u, INDEX_NAME));
  • ... что-то другое .. ??

А как заблокировать, пока не будет обработана последняя сущность в потоке?

1 Ответ

0 голосов
/ 17 июля 2020

Во-первых, краткое объяснение flatMap() vs map():

  • flatMap() следует использовать для неблокирующих операций (асинхронных) - короче говоря, все, что возвращает Mono, Flux ,
  • map() следует использовать для преобразования объекта путем применения синхронной функции к каждому элементу (выполняется в фиксированное время).

Итак, если вы хотите сохранить документ в Elasticsearch вам следует использовать flatMap() - это асинхронная c операция, и время ее выполнения не определено c.

Когда дело доходит до doOnNext(), оно используется для добавления поведения побочных эффектов, срабатывает, когда Flux излучает элемент, например, для регистрации или какого-либо другого действия, которое должно произойти после завершения сохранения.

Пример:

Mono.just(productDto)
    .map(dto -> new Product(dto.getName(), dto.isAvailable()))
    .flatMap(elasticsearchOperations::save)
    .doOnNext(savedProduct -> logger.info("Saved {}", savedProduct.getProductName()))
    .subscribe();

А вы не необходимо заблокировать на save(), это ответственность flatMap().

...