Как выполнить последовательность операций и убедиться, что одна операция завершена до следующей в веб-приложении Spring Reactor? - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть веб-приложение Spring Boot 2, в котором мне нужно идентифицировать посетителя сайта по cookie и собирать статистику просмотра страниц.Поэтому мне нужно перехватить каждый веб-запрос.Код, который я должен был написать, более сложен, чем обратный вызов в ад (сама проблема, которую должен был решить реактор Spring).

Вот код:

package mypack.conf;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.http.HttpCookie;
import org.springframework.http.ResponseCookie;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import mypack.dao.PageViewRepository;
import mypack.dao.UserRepository;
import mypack.domain.PageView;
import mypack.domain.User;
import mypack.security.JwtProvider;

import reactor.core.publisher.Mono;
@Configuration


@ComponentScan(basePackages = "mypack")
@EnableReactiveMongoRepositories(basePackages = "mypack")
public class WebConfig implements WebFluxConfigurer {

    @Autowired
    @Lazy
    private UserRepository userRepository;

    @Autowired
    @Lazy
    private PageViewRepository pageViewRepository;


    @Autowired
    @Lazy
    JwtProvider jwtProvider;


    @Bean
    public WebFilter sampleWebFilter()  {
        return new WebFilter() {

            @Override
            public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {

                String uri = exchange.getRequest().getURI().toString();
                String path = exchange.getRequest().getPath().pathWithinApplication().value();


                HttpCookie  cookie = null;
                String token = "";
                Map<String, List<HttpCookie>> cookies = exchange.getRequest().getCookies();


                try {
                    if((exchange.getRequest().getCookies().containsKey("_token") )
                            &&  (exchange.getRequest().getCookies().getFirst("_token"))!=null  ) {

                        cookie = exchange.getRequest().getCookies().getFirst("_token");
                        token = cookie.getValue();


                        return userRepository.findByToken(token).map(user -> {

                                exchange.getAttributes().put("_token", user.getToken());


                                PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build();
                                pageViewRepository.save(pg).subscribe(pg1 -> {user.getPageviews().add(pg1); });

                                userRepository.save(user).subscribe();
                                    return user;
                            })


                            .flatMap(user-> chain.filter(exchange)); // ultimately this step executes regardless user exist or not

                    // handle case when brand new user first time visits website    
                    } else {
                        token = jwtProvider.genToken("guest", UUID.randomUUID().toString());
                        User user = User.builder().createdDate(LocalDateTime.now()).token(token).emailId("guest").build();
                        userRepository.save(user).subscribe();
                        exchange.getResponse().getCookies().remove("_token");

                        ResponseCookie rcookie  = ResponseCookie.from("_token", token).httpOnly(true).build();
                        exchange.getResponse().addCookie(rcookie);
                        exchange.getAttributes().put("_token", token);

                    }

                } catch (Exception e) {

                    e.printStackTrace();
                }



                return chain.filter(exchange);
            } // end of  Mono<Void> filter method
        }; // end of New WebFilter (anonymous class)
    }

}

Другие соответствующие классы:

@Repository
public interface PageViewRepository extends   ReactiveMongoRepository<PageView, String>{

    Mono<PageView> findById(String id);

}


@Repository
public interface UserRepository extends   ReactiveMongoRepository<User, String>{

    Mono<User> findByToken(String token);

}





@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class User {

    @Id
    private String id;
    private String token;


    @Default
    private LocalDateTime createdDate = LocalDateTime.now();

    @DBRef
    private List<PageView> pageviews;

}



Data
@Document
@Builder
public class PageView {
    @Id
    private String id;

    private String URL;

    @Default
    private LocalDateTime createdDate = LocalDateTime.now();
}

Соответствующая часть файла Gradle:

buildscript {
    ext {

        springBootVersion = '2.0.1.RELEASE'
    }
    repositories {
        mavenCentral()

    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

dependencies {

    compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
    compile('org.springframework.boot:spring-boot-starter-security')
    compile('org.springframework.boot:spring-boot-starter-thymeleaf')

    compile('org.springframework.boot:spring-boot-starter-webflux')

    compile('org.springframework.security:spring-security-oauth2-client')
    compile('org.springframework.security.oauth:spring-security-oauth2:2.3.4.RELEASE')
    runtime('org.springframework.boot:spring-boot-devtools')
    compileOnly('org.projectlombok:lombok')
    compile "org.springframework.security:spring-security-jwt:1.0.9.RELEASE"
    compile "io.jsonwebtoken:jjwt:0.9.0"

    testCompile('org.springframework.boot:spring-boot-starter-test')

    testCompile('io.projectreactor:reactor-test')

    compile('com.fasterxml.jackson.core:jackson-databind')
}

Проблеманаходится в следующих строках:

PageView pg = PageView.builder (). creationDate (LocalDateTime.now ()). URL (uri) .build ();pageViewRepository.save (pg) .subscribe (pg1 -> {user.getPageviews (). add (pg1);});

, которая вешает браузер (продолжает ждать ответа).

В основном я хочу вот что: не должен использовать block (), который даже не работает в коде веб-фильтра, так как block также приводит к зависанию браузера.Сохранить просмотр страницы в Монго БД.После того, как оно сохранено, просмотр страницы имеет действительный идентификатор mongodb, который необходимо сохранить в качестве ссылки в списке просмотров пользователя.Поэтому только после того, как он будет сохранен в БД, следующим шагом будет обновление списка просмотров страниц пользователя.Следующим шагом является сохранение обновленного пользователя без влияния на методы контроллера нисходящего потока, которые также могут обновлять пользователя и, возможно, также должны сохранять пользователя.Все это должно работать в заданном контексте WebFilter.

Как решить эту проблему?

В предоставленном решении необходимо убедиться, что пользователь сохранен в веб-фильтре, прежде чем переходить к действиям контроллера, некоторые из которых такжесохраняет пользователя с другими значениями из параметров строки запроса.

Ответы [ 3 ]

0 голосов
/ 06 января 2019

Если я вас правильно понимаю, вам нужно асинхронно выполнять длинные операции с базой данных, чтобы предотвратить блокировку фильтра (и самого запроса)?

В этом случае я бы порекомендовал следующее решение, которое работает дляЯ:

@Bean
public WebFilter filter() {
    return (exchange, chain) -> {
        ServerHttpRequest req = exchange.getRequest();
        String uri = req.getURI().toString();
        log.info("[i] Got request: {}", uri);

        var headers = req.getHeaders();
        List<String> tokenList = headers.get("token");

        if (tokenList != null && tokenList.get(0) != null) {
            String token = tokenList.get(0);
            log.info("[i] Find a user by token {}", token);
            return userRepo.findByToken(token)
                    .map(user -> process(exchange, uri, token, user))
                    .then(chain.filter(exchange));
        } else {
            String token = UUID.randomUUID().toString();
            log.info("[i] Create a new user with token {}", token);
            return userRepo.save(new User(token))
                    .map(user -> process(exchange, uri, token, user))
                    .then(chain.filter(exchange));
        }
    };
}

Здесь я немного изменяю вашу логику и беру значение токена из соответствующего заголовка (а не из файлов cookie), чтобы упростить мою реализацию.

Итак, если токен присутствует, мы пытаемся найти его пользователя.Если токена нет, мы создаем нового пользователя.Если пользователь найден или успешно создан, то вызывается метод process.После этого, независимо от результата, мы возвращаем chain.filter(exchange).

Метод process помещает значение токена в соответствующий атрибут запроса и асинхронно вызывает метод updateUserStat из userService:

private User process(ServerWebExchange exchange, String uri, String token, User user) {
    exchange.getAttributes().put("_token", token);
    userService.updateUserStat(uri, user); // async call
    return user;
}

Служба пользователя:

@Slf4j
@Service
public class UserService {

    private final UserRepo userRepo;
    private final PageViewRepo pageViewRepo;

    public UserService(UserRepo userRepo, PageViewRepo pageViewRepo) {
        this.userRepo = userRepo;
        this.pageViewRepo = pageViewRepo;
    }

    @SneakyThrows
    @Async
    public void updateUserStat(String uri, User user) {
        log.info("[i] Start updating...");
        Thread.sleep(1000);
        pageViewRepo.save(new PageView(uri))
                .flatMap(user::addPageView)
                .blockOptional()
                .ifPresent(u -> userRepo.save(u).block());
        log.info("[i] User updated.");
    }
}

Я добавил небольшую задержку для целей тестирования, чтобы убедиться, что запросы работают без задержки, независимо от продолжительности этого метода.

Случай, когда по токену обнаружен пользователь:

2019-01-06 18:25:15.442  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=1000
2019-01-06 18:25:15.443  INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 84b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:25:15.444 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:25:15.445 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "84b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:15.457  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:25:15.457  INFO 4992 --- [         task-3] : [i] Start updating...
2019-01-06 18:25:15.458 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:25:16.459 DEBUG 4992 --- [         task-3] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:25:16.476 DEBUG 4992 --- [         task-3] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:25:16.479  INFO 4992 --- [         task-3] : [i] User updated.

Здесь мы видим, что обновление пользователя выполняется в независимом потоке task-3 после того, как у пользователя уже есть результатзапроса 'get all users'.

Случай, когда токен отсутствует и пользователь создан:

2019-01-06 18:33:54.764  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=763
2019-01-06 18:33:54.764  INFO 4992 --- [ctor-http-nio-3] : [i] Create a new user with token d9bd40ea-b869-49c2-940e-83f1bf79e922
2019-01-06 18:33:54.765 DEBUG 4992 --- [ctor-http-nio-3] : Inserting Document containing fields: [token, _class] in collection: user
2019-01-06 18:33:54.776  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:33:54.777  INFO 4992 --- [         task-4] : [i] Start updating...
2019-01-06 18:33:54.777 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:33:55.778 DEBUG 4992 --- [         task-4] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-06 18:33:55.792 DEBUG 4992 --- [         task-4] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-06 18:33:55.795  INFO 4992 --- [         task-4] : [i] User updated.

Случай, когда токен присутствует, но пользователь не найден:

2019-01-06 18:35:40.970  INFO 4992 --- [ctor-http-nio-3] : [i] Got request: http://localhost:8080/users?test=150
2019-01-06 18:35:40.970  INFO 4992 --- [ctor-http-nio-3] : [i] Find a user by token 184b0f7ec-670c-4c04-8a7c-b692752d7cfa
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : Created query Query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" }, Fields: { }, Sort: { }
2019-01-06 18:35:40.972 DEBUG 4992 --- [ctor-http-nio-3] : find using query: { "token" : "184b0f7ec-670c-4c04-8a7c-b692752d7cfa" } fields: Document{{}} for class: class User in collection: user
2019-01-06 18:35:40.977  INFO 4992 --- [ntLoopGroup-2-2] : [i] Get all users...
2019-01-06 18:35:40.978 DEBUG 4992 --- [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user

Мой демонстрационный проект: sb-реактивный-фильтр-демо

0 голосов
/ 20 января 2019

Еще один вариант, который создает просмотр страницы и обновляет пользователя в веб-фильтре неблокирующим образом перед передачей запроса в контроллер:

@Bean
public WebFilter filter() {
    return (exchange, chain) -> {
        ServerHttpRequest req = exchange.getRequest();
        String uri = req.getURI().toString();
        log.info("[i] Web Filter: received the request: {}", uri);

        var headers = req.getHeaders();
        List<String> tokenList = headers.get("token");

        if (tokenList != null && tokenList.get(0) != null) {
            String token = tokenList.get(0);
            Mono<User> foundUser = userRepo
                    .findByToken(token)
                    .doOnNext(user -> log.info("[i] Web Filter: {} has been found", user));
            return updateUserStat(foundUser, exchange, chain, uri);
        } else {
            String token = UUID.randomUUID().toString();
            Mono<User> createdUser = userRepo
                    .save(new User(token))
                    .doOnNext(user -> log.info("[i] Web Filter: a new {} has been created", user));
            return updateUserStat(createdUser, exchange, chain, uri);
        }
    };
}
private Mono<Void> updateUserStat(Mono<User> userMono, ServerWebExchange exchange, WebFilterChain chain, String uri) {
    return userMono
            .doOnNext(user -> exchange.getAttributes().put("_token", user.getToken()))
            .doOnNext(u -> {
                String token = exchange.getAttribute("_token");
                log.info("[i] Web Filter: token attribute has been set to '{}'", token);
            })
            .flatMap(user -> pageViewRepo.save(new PageView(uri)).flatMap(user::addPageView).flatMap(userRepo::save))
            .doOnNext(user -> {
                int numberOfPages = 0;
                List<PageView> pageViews = user.getPageViews();
                if (pageViews != null) {
                    numberOfPages = pageViews.size();
                }
                log.info("[i] Web Filter: {} has been updated. Number of pages: {}", user, numberOfPages);
            })
            .then(chain.filter(exchange));
}

Этот код дает следующие результаты:

1) Токен отсутствует: создайте нового пользователя, создайте просмотр страницы, обновите нового пользователя, передайте запрос контроллеру

2019-01-20 14:39:10.033 [ctor-http-nio-3] : [i] Web Filter: received the request: http://localhost:8080/users?test=784
2019-01-20 14:39:10.110 [ctor-http-nio-3] : Inserting Document containing fields: [token, _class] in collection: user
2019-01-20 14:39:10.206 [ntLoopGroup-2-2] : [i] Web Filter: a new User(id=5c446bee24c86426ac6c0ae5, token=fba944cd-decb-4923-9757-724da5a60061) has been created
2019-01-20 14:39:10.212 [ntLoopGroup-2-2] : [i] Web Filter: token attribute has been set to 'fba944cd-decb-4923-9757-724da5a60061'
2019-01-20 14:39:11.227 [     parallel-1] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-20 14:39:11.242 [ntLoopGroup-2-2] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-20 14:39:11.256 [ntLoopGroup-2-2] : [i] Web Filter: User(id=5c446bee24c86426ac6c0ae5, token=fba944cd-decb-4923-9757-724da5a60061) has been updated. Number of pages: 1
2019-01-20 14:39:11.289 [ntLoopGroup-2-2] : [i] Controller: handling 'get all users' request. Token attribute is 'fba944cd-decb-4923-9757-724da5a60061'
2019-01-20 14:39:11.369 [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class io.github.cepr0.demo.User in collection: user

2) Токен присутствует: найдите существующийпользователь, создайте просмотр страницы, обновите пользователя, передайте запрос контроллеру

2019-01-20 14:51:21.983 [ctor-http-nio-3] : [i] Web Filter: received the request: http://localhost:8080/users?test=538
2019-01-20 14:51:22.074 [ctor-http-nio-3] : Created query Query: { "token" : "b613b810-cc36-4961-ad2e-db44f52cd2dd" }, Fields: { }, Sort: { }
2019-01-20 14:51:22.092 [ctor-http-nio-3] : find using query: { "token" : "b613b810-cc36-4961-ad2e-db44f52cd2dd" } fields: Document{{}} for class: class User in collection: user
2019-01-20 14:51:22.102 [ntLoopGroup-2-2] : [i] Web Filter: User(id=5c434c2eb338ac3530cbd56d, token=b613b810-cc36-4961-ad2e-db44f52cd2dd) has been found
2019-01-20 14:51:22.102 [ntLoopGroup-2-2] : [i] Web Filter: token attribute has been set to 'b613b810-cc36-4961-ad2e-db44f52cd2dd'
2019-01-20 14:51:23.103 [     parallel-2] : Inserting Document containing fields: [URL, createdDate, _class] in collection: pageView
2019-01-20 14:51:23.115 [ntLoopGroup-2-2] : Saving Document containing fields: [_id, token, pageViews, _class]
2019-01-20 14:51:23.117 [ntLoopGroup-2-2] : [i] Web Filter: User(id=5c434c2eb338ac3530cbd56d, token=b613b810-cc36-4961-ad2e-db44f52cd2dd) has been updated. Number of pages: 13
2019-01-20 14:51:23.118 [ntLoopGroup-2-2] : [i] Controller: handling 'get all users' request. Token attribute is 'b613b810-cc36-4961-ad2e-db44f52cd2dd'
2019-01-20 14:51:23.119 [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user

3) Токен присутствует, но пользователь не найден: передайте запрос контроллеру

2019-01-20 14:52:41.842 [ctor-http-nio-3] : [i] Web Filter: received the request: http://localhost:8080/users?test=513
2019-01-20 14:52:41.844 [ctor-http-nio-3] : Created query Query: { "token" : "-b613b810-cc36-4961-ad2e-db44f52cd2dd" }, Fields: { }, Sort: { }
2019-01-20 14:52:41.845 [ctor-http-nio-3] : find using query: { "token" : "-b613b810-cc36-4961-ad2e-db44f52cd2dd" } fields: Document{{}} for class: class User in collection: user
2019-01-20 14:52:41.850 [ntLoopGroup-2-2] : [i] Controller: handling 'get all users' request. Token attribute is 'null'
2019-01-20 14:52:41.850 [ntLoopGroup-2-2] : find using query: { } fields: Document{{}} for class: class User in collection: user

Демо: sb-реактивный-фильтр-демо (ветка: update-user-in-web-filter)

0 голосов
/ 09 декабря 2018

Для сбора статистики по просмотру страниц я предлагаю изменить стратегию и использовать вместо нее Актуатор и Микрометр:

  1. Добавить зависимость привода к вашему проекту
  2. Показать соответствующие конечные точки (здесь metrics)
  3. Перейдите на /actuator/metrics и выберите показатель для HTTP-запросов к серверу (см. справочную документацию ).

Микрометр предлагает гораздо больше и помогает вам правильно подбирать метрики, такие как: учет пауз ГХ при измерении времени, предоставление гистограмм / процентилей / ... и т. Д.

...