Использование функции elapsed () на Mono? - PullRequest
0 голосов
/ 10 июля 2019

Я пытаюсь получить время выполнения для чтения из redis в реактивном программировании, при просмотре документов я вижу, что метод elapsed() будет выполнять тот же код и реализован как показано ниже.

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

Вывод:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

Я ожидаю, что время, затрачиваемое на каждый запрос кеша, будет <5 миллисекунд, как первый и второй запрос, но это не так.<code>elapsed() добавляет текущее время выборки кумулятивному?Насколько я понимаю, каждый элемент, испускаемый из потока является независимым?

Ответы [ 2 ]

0 голосов
/ 12 июля 2019

Mono#elapsed измеряет время между подпиской на Mono и моментом, когда Mono испускает элемент (onNext).

Что вызывает подписку и запуск таймерав вашем случае это внешнее распараллеливание flatMap, которое вызывает methodToReadFromCache.

Что вызывает onNext и, следовательно, что синхронизировано, это комбинация hasKey и части if / else (redisOperations.opsForValue().get(cacheKey)vs authzService).

Внешняя плоская карта должна иметь как минимум столько же таймеров, сколько имеется ЦП, поскольку мы находимся в параллельном режиме.на то, что что-то либо блокирует, либо имеет ограниченные возможности.Например, может ли быть так, что redisTemplate может обрабатывать только несколько ключей одновременно?

0 голосов
/ 10 июля 2019

согласно документации

Я хочу связать выбросы с периодом (Tuple2<Long, T>) измеренным ...

  • с момента подписки: истек

  • с незапамятных времен (ну, компьютерное время): отметка времени

elapsed измеряется время с момента подписки.Таким образом, вы подписываетесь, и он начинает излучать, время будет увеличиваться, чем дольше вы подписались на услугу.

официальные документы

...