Я работаю над простым чатом, работающим под управлением Spring Boot 2.1.1 с WebFlux, Reactor 3.2.3, Mongo 3.8.2 и Netty 4.1.31.
В каждой комнате чата есть 2 коллекции - сообщенияархив и ограниченная коллекция с текущими событиями (например, событие нового сообщения, индикаторы ввода пользователя и т. д.).Ограниченная коллекция содержит 100 элементов, и я использую метод tail () ReactiveMongoTemplate для извлечения последних событий.
Служба предоставляет 2 вида конечных точек для извлечения последних событий: SSE и для опроса.Я провел некоторое стресс-тестирование с 2000 одновременными пользователями, которые, кроме прослушивания чата, рассылали тонны событий.
Наблюдения:
- опроскаждые 2 секунды вносит небольшой стресс в службу (~ 40% использования ЦП во время теста), и почти нет нагрузки на MongoDB (~ 4%)
- прослушивание через SSE максимально увеличивает MongoDB (~ 90%), также подчеркивает службу (которая пытается использовать оставшиеся доступные ресурсы), но Mongo особенно борется, и в целом служба становится почти не отвечающей.
Наблюдение кажется очевидным, потому что, когда я подключилсячерез SSE во время теста, он почти мгновенно обновил меня, когда появилось новое событие - в основном SSE был в сотни раз более отзывчивым, чем опрос каждые 2 секунды.
Вопрос:
Учитывая, что клиент в конечном итоге является подписчиком (или, по крайней мере, я думаю, что это дается ограниченными знаниями), могу ли я каким-то образом снизить скоростьпубликации сообщений с помощью ReactiveMongoTemplate?Или как-то уменьшить спрос на новые события без необходимости делать это на стороне клиента?
Я пытался везти с буферизацией и кэшированием Flux, однако это вызвало еще больший стресс ...
Код:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}