Поток реактора Java не отображается в соответствии с ожиданиями - PullRequest
0 голосов
/ 16 ноября 2018

У меня есть следующие настройки для демонстрационного приложения:

  • mongodb, содержащий 2 коллекции: 1 с криптовалютами и 1 с курсами обмена этих криптовалют
  • весенний проект webflux для полученияобновления этих обменных курсов в режиме реального времени с использованием событий Server Sent

У меня есть служба, которая возвращает Flux из List<CryptoCurrencyRateDTO> на основе валют, присутствующих в коллекции криптовалют.Я генерирую случайный обменный курс для каждой из этих валют и транслирую их веб-клиенту.

Сервис такой:

@Service
public class CryptoCurrencyRateService {
  @Autowired private CryptoCurrencyRateRepository rateRepository;
  @Autowired private CryptoCurrencyRepository currencyRepository;

  // constructor

  public Flux<List<CryptoCurrencyRateDTO>> realtimeRates() {
    return currencyRepository.findAll()
      .map(CryptoCurrency::getSymbol)
      .flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
      .zipWith(
        Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
        (rate, timestamp) -> new CryptoCurrencyRate(rate.getSymbol(), timestamp, randomRateBasedOnPrevious )
      )
      .flatMap(rateRepository::save)
      .map(rateMapper::toDto)
      .collectList()
      .delayElement(Duration.ofSeconds(5))
      .repeat();
  }
}

CryptoCurrencyRateRepository выглядит следующим образом:

@Repository
public interface CryptoCurrencyRateRepository extends ReactiveMongoRepository<CryptoCurrencyRate, String> {
    Mono<CryptoCurrencyRate> findTopBySymbolOrderByTimestamp(String symbol);
}

Однако после вызова .flatMap(rateRepository::findTopBySymbolOrderByTimestamp) я получаю только Flux, содержащий 1 элемент, в то время как я думал, что получу Flux, содержащий максимальную ставку для каждого символа из вызова currencyRepository.findAll().map(CryptoCurrency::getSymbol), потому что моя криптовалютаКоллекция содержит 3 валюты.

Когда я смотрю в журнале, я вижу, что вызов findTopBySymbolOrderByTimestamp выполняется 3 раза

2018-11-16 16:04:33.626 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "BTC" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.627 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "ETH" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.629 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "XRP" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate

Ответы [ 2 ]

0 голосов
/ 22 ноября 2018

Похоже, мои ожидания оправдались. Единственной ошибкой было то, что не все объекты были сохранены в Монго, потому что у них не было идентификатора.

0 голосов
/ 21 ноября 2018

Я не могу воспроизвести вашу проблему.Вот как я имитирую это

public static void main(String[] args) {
    Flux<String> stringFlux = Flux.fromStream(Stream.of("a", "b", "c"));
    System.out.println(realtimeRates(stringFlux).blockFirst());
}

static Flux<List<String>> realtimeRates(Flux<String> list) {
    Flux<String> symbols = list.map(Scratch::getSymbol);
    Flux<String> topRates = symbols.flatMap(Scratch::findTopBySymbolOrderByTimestamp);
    Flux<String> zip = topRates.zipWith(
        Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
        (rate, timestamp) -> rate + timestamp.toString());
    Mono<List<String>> listMono = zip.collectList();
    Mono<List<String>> delayElement = listMono.delayElement(Duration.ofSeconds(5));
    Flux<List<String>> repeat = delayElement.repeat();
    return repeat;
}

static Mono<String> findTopBySymbolOrderByTimestamp(String symbol) {
    return Mono.just("other-" + symbol);
}

static String getSymbol(String rate) {
    return rate.toLowerCase();
}

Как видите, вы получите что-то вроде [other-a1542821666133, other-b1542821666133, other-c1542821666133].

Как вы проверяете результат плоской карты?имейте в виду, что если вы сделаете это с помощью методов blockFirst() или blockLast(), вы получите только один элемент, поскольку он Flux<String> (проверьте переменную topRates в приведенном выше коде)

...