как правильно сохранять данные в MongoDb, выбрасывать Spring-данные в неблокирующий стек, используя CompletableFuture - PullRequest
0 голосов
/ 28 марта 2020

Вопрос можно резюмировать: как правильно сохранять данные в MongoDb, выбрасывать Spring-данные в неблокирующий стек, используя CompletableFuture (т.е. Spring Webflux + реактивный.ReactiveCrudRepository + java .util.concurrent)?

Последние три дня я боролся за то, чтобы изучать и изучать и читать несколько учебных пособий, чтобы найти рекомендуемый способ или, по крайней мере, «северный путь» для сохранения данных, когда кто-то хочет использовать для этого CompletableFuture. Я мог бы достичь следующего кода, успешно работающего, но я не уверен, что делаю какие-то странные вещи.

По сути, я хочу использовать CompletableFuture, потому что я хочу связать будущее. Допустим, сначала сохраните в MongoDb, а если все сделано правильно, то затем «thenAcceptAsyn c» и, наконец, «thenCombine».

Ну, ReactiveCrudRepository.save возвращает Mono <>, и я должен подписаться, чтобы эффективно сохранить Это. Кроме того, Mono <>. Subscribe () возвращает dispose, тогда как c Я понимаю, что могу использовать для его отмены, скажем, если поток занимает слишком много времени, потому что MongoDb отсутствует, например, или любое другое исключение. Пока все хорошо.

Что мне неясно, так это то, что я не ошибаюсь в идее использования сохранения данных, которые блокируются асинхронным методом. Так как я пытаюсь разрешить «будущее» разрешение, я блокирую во время нижеописанного метода сохранения и полностью теряю преимущество сохранения в другом потоке и получения результата в будущем?

Правильное сохранение кода в MongoDb, но не ясно мне, если это действительно "не блокирующий" подход. Обратите внимание, что completetableFuture.get () прокомментирован, так как он мне не нужен в onder для эффективного сохранения моих данных

@Async("taskExecutor")
public void transferirDisposableReturnedSupplyAsync(Extrato e) throws InterruptedException, ExecutionException {
    CompletableFuture<Disposable> completableFuture = CompletableFuture
            .supplyAsync(() -> extratoRepository.save(e).subscribe());

    //completableFuture.get(); unnecessary since subscribe() above already saved it
}

На случай, если это уместно: Репозиторий:

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

AsyncConfiguration :

import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

// The @EnableAsync annotation enables Spring’s ability to run @Async methods in a background thread pool. 
// The bean taskExecutor helps to customize the thread executor such as configuring number of threads for an application, queue limit size and so on. 
// Spring will specifically look for this bean when the server is started. 
// If this bean is not defined, Spring will create SimpleAsyncTaskExecutor by default.

@Configuration
@EnableAsync
public class AsyncConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        LOGGER.debug("Creating Async Task Executor");
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("ExtratoThread-");
        executor.initialize();
        return executor;
    }
}

*** добавлено

import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';


@Injectable({
  providedIn: "root"
})
export class SseService {
  extratos: Extrato[] = [];
  constructor(private _zone: NgZone) { }

  getServerSentEvent(url: string): Observable<any> {
    this.extratos = [];
    return Observable.create(observer => {
      const eventSource = this.getEventSource(url);
      eventSource.onmessage = event => {
        this._zone.run(() => {
          let json = JSON.parse(event.data);
          this.extratos.push(new Extrato(json['id'], json['description'], json['value'], json['status']));
          observer.next(this.extratos);
        });
      };
      eventSource.onerror = (error) => {
        if (eventSource.readyState === 0) {
          console.log('The stream has been closed by the server.');
          eventSource.close();
          observer.complete();
        } else {
          observer.error('EventSource error: ' + error);
        }
      }

    });
  }
  private getEventSource(url: string): EventSource {
    return new EventSource(url);
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...