У меня есть следующие настройки для демонстрационного приложения:
- mongodb, содержащий 2 коллекции: 1 с криптовалютами и 1 с курсами обмена этих криптовалют
- весенний проект webflux для полученияобновления этих обменных курсов в режиме реального времени с использованием событий Server Sent
У меня есть служба, которая возвращает Flux
из List<CryptoCurrencyRateDTO>
на основе валют, присутствующих в коллекции криптовалют.Я генерирую случайный обменный курс для каждой из этих валют и транслирую их веб-клиенту.
Сервис такой:
@Service
public class CryptoCurrencyRateService {
@Autowired private CryptoCurrencyRateRepository rateRepository;
@Autowired private CryptoCurrencyRepository currencyRepository;
// constructor
public Flux<List<CryptoCurrencyRateDTO>> realtimeRates() {
return currencyRepository.findAll()
.map(CryptoCurrency::getSymbol)
.flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
.zipWith(
Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
(rate, timestamp) -> new CryptoCurrencyRate(rate.getSymbol(), timestamp, randomRateBasedOnPrevious )
)
.flatMap(rateRepository::save)
.map(rateMapper::toDto)
.collectList()
.delayElement(Duration.ofSeconds(5))
.repeat();
}
}
CryptoCurrencyRateRepository
выглядит следующим образом:
@Repository
public interface CryptoCurrencyRateRepository extends ReactiveMongoRepository<CryptoCurrencyRate, String> {
Mono<CryptoCurrencyRate> findTopBySymbolOrderByTimestamp(String symbol);
}
Однако после вызова .flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
я получаю только Flux
, содержащий 1 элемент, в то время как я думал, что получу Flux
, содержащий максимальную ставку для каждого символа из вызова currencyRepository.findAll().map(CryptoCurrency::getSymbol)
, потому что моя криптовалютаКоллекция содержит 3 валюты.
Когда я смотрю в журнале, я вижу, что вызов findTopBySymbolOrderByTimestamp
выполняется 3 раза
2018-11-16 16:04:33.626 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "BTC" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.627 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "ETH" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.629 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate : find using query: { "symbol" : "XRP" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate