Spring WebFlux с MongoDB - регулирование клиентов SSE - PullRequest
0 голосов
/ 03 января 2019

Я работаю над простым чатом, работающим под управлением Spring Boot 2.1.1 с WebFlux, Reactor 3.2.3, Mongo 3.8.2 и Netty 4.1.31.

В каждой комнате чата есть 2 коллекции - сообщенияархив и ограниченная коллекция с текущими событиями (например, событие нового сообщения, индикаторы ввода пользователя и т. д.).Ограниченная коллекция содержит 100 элементов, и я использую метод tail () ReactiveMongoTemplate для извлечения последних событий.

Служба предоставляет 2 вида конечных точек для извлечения последних событий: SSE и для опроса.Я провел некоторое стресс-тестирование с 2000 одновременными пользователями, которые, кроме прослушивания чата, рассылали тонны событий.

Наблюдения:

  • опроскаждые 2 секунды вносит небольшой стресс в службу (~ 40% использования ЦП во время теста), и почти нет нагрузки на MongoDB (~ 4%)
  • прослушивание через SSE максимально увеличивает MongoDB (~ 90%), также подчеркивает службу (которая пытается использовать оставшиеся доступные ресурсы), но Mongo особенно борется, и в целом служба становится почти не отвечающей.

Наблюдение кажется очевидным, потому что, когда я подключилсячерез SSE во время теста, он почти мгновенно обновил меня, когда появилось новое событие - в основном SSE был в сотни раз более отзывчивым, чем опрос каждые 2 секунды.

Вопрос:

Учитывая, что клиент в конечном итоге является подписчиком (или, по крайней мере, я думаю, что это дается ограниченными знаниями), могу ли я каким-то образом снизить скоростьпубликации сообщений с помощью ReactiveMongoTemplate?Или как-то уменьшить спрос на новые события без необходимости делать это на стороне клиента?

Я пытался везти с буферизацией и кэшированием Flux, однако это вызвало еще больший стресс ...

Код:

// ChatRepository.java

private static final Query chatEventsQuery = new Query();

public Flux<ChatEvent> getChatEventsStream(String chatId) {
    return reactiveMongoTemplate.tail(
            chatEventsQuery,
            ChatEvent.class,
            chatId
    );
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvent(username))
            .map(event -> ServerSentEvent.<ChatEvent>builder()
                    .event(event.getType().getEventName())
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

// ChatRouter.java

RouterFunction<ServerResponse> routes(ChatHandler handler) {
    return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}

1 Ответ

0 голосов
/ 04 января 2019

Ответ: Вы делаете это, используя Flux.buffer метод.Затем поток будет отправлять события подписчикам массовым образом с определенной скоростью.

Код, который я разместил, имел две основные проблемы

  1. Учитывая, что несколько пользователей обычно слушают один чат, я реорганизовал ChatRepository, чтобы использовать преимущества «горячих», воспроизводимых потоков (теперь у меня 1 поток на чат вместо 1 потока на пользователя), который я храню в Caffeine кеш.Кроме того, я буферизирую их через короткие промежутки времени, чтобы избежать интенсивного использования ресурсов при передаче событий клиентам в занятых чатах.

  2. new Query(), который я использовал в ChatRepository, был избыточным.Я посмотрел на код ReactiveMongoTemplate и, если предоставляется ненулевой запрос, логика немного сложнее.Вместо этого лучше передать null методу ReactiveMongoTemplate tail().

Код пострефакторинга

// ChatRepository.java

public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
    return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
            .orElseGet(newCachedChatEventsStream(chatId))
            .autoConnect();
}

private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
    return () -> {
        ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
                null,
                ChatEvent.class,
                chatId
        ).buffer(Duration.ofMillis(chatEventsBufferInterval))
                .replay(chatEventsReplayCount);

        chatStreamsCache.put(chatId, chatEventsStream);

        return chatEventsStream;
    };
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvents(username))
            .map(event -> ServerSentEvent.<List<ChatEvent>>builder()
                    .event(CHAT_SSE_NAME)
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

После применения этих изменений служба работает хорошо даже с 3000 активных пользователей (JVM использует ~ 50% ЦП, Mongo ~ 7% в основном из-за большого количества вставок - потокине так заметно сейчас)

...