Как вызвать блок реактивного потока в WebFilter в проекте Spring WebFlux (Netty)? - PullRequest
0 голосов
/ 19 января 2019

У меня есть сценарий использования, когда я должен вызвать block () для операции сохранения пользователя в перехватчике запросов (WebFilter) внутри webfilter перед передачей на конечную точку контроллера.Это связано с тем, что webfilter должен сохранять пользователя с его собственными обновлениями (статистикой просмотра страниц) для объекта пользователя, а конечная точка контроллера должна сохранять пользователя с собственным обновлением в зависимости от того, что находится в части запроса в строке запроса.Сохранение пользователя веб-фильтра не может выполняться в своем собственном потоке, в противном случае сохранение пользователя конечной точки контроллера может быть перезаписано сохранением пользователя веб-фильтра.

В настоящее время с кодом, приведенным ниже, браузер постоянно ожидает ответа, в журнале написано «Начать обновление ...», но «Пользователь обновлен».никогда не регистрируется.

Это код, показывающий веб-фильтр:

package mypackage.conf;


import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseCookie;

import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;

import lombok.extern.slf4j.Slf4j;
import mypackage.dao.PageViewRepository;
import mypackage.dao.UserRepository;
import mypackage.domain.PageView;
import mypackage.domain.User;


import mypackage.service.UserService;
import reactor.core.publisher.Mono;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration

@Slf4j
@ComponentScan(basePackages = "mypackage")
@EnableReactiveMongoRepositories(basePackages = "mypackage")
public class WebConfig implements WebFluxConfigurer {

    @Autowired
    @Lazy
    private UserRepository userRepository;

    @Autowired
    @Lazy
    private UserService userService;

    @Autowired
    @Lazy
    private PageViewRepository pageViewRepository;

    @Autowired
    @Lazy
    JwtProvider jwtProvider;

    @Value("${images..base}")
    String baseSytemImagePath;




    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {

        registry.addResourceHandler("/images/**").addResourceLocations("file:"+baseSytemImagePath+"/");

    }

    @Bean
    public WebFilter filter() {
        return (exchange, chain) -> {
            ServerHttpRequest req = exchange.getRequest();
            String uri = req.getURI().toString();

            String token;
            // get token from http cookie 
            // (code omitted)

            return userRepository.findByToken(token)
                    .map(user -> process(exchange, uri, token, user))

                    // if user not found by token
                    .switchIfEmpty(
                            userRepository.save(User.builder().createdDate(LocalDateTime.now())
                                    .token(jwtProvider.genToken("run23", UUID.randomUUID().toString())).build())
                                    .map(user -> process(exchange, uri, user.getToken(), user)))
                    .then(chain.filter(exchange));

        };
    }

    private User process(ServerWebExchange exchange, String uri, String token, User user) {

        log.info("Start updating...");

        PageView pg = PageView.builder().createdDate(LocalDateTime.now()).URL(uri).build();
        pageViewRepository.save(pg)
                .flatMap(user::addPageView)
                .blockOptional()
                .ifPresent(u -> userRepository.save(u).log("save user pageview").block());

        log.info("User updated.");

        return user;
    }


}

Соответствующее содержимое файла сборки Gradle:

buildscript {
    ext {

        springBootVersion = '2.0.1.RELEASE'
    }
    repositories {
        mavenCentral()

    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

dependencies {

    compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
    compile('org.springframework.boot:spring-boot-starter-security')
    compile('org.springframework.boot:spring-boot-starter-thymeleaf')

    compile('org.springframework.boot:spring-boot-starter-webflux')

    compile('org.springframework.security:spring-security-oauth2-client')
    compile('org.springframework.security.oauth:spring-security-oauth2:2.3.4.RELEASE')
    runtime('org.springframework.boot:spring-boot-devtools')
    compileOnly('org.projectlombok:lombok')
    compile "org.springframework.security:spring-security-jwt:1.0.9.RELEASE"
    compile "io.jsonwebtoken:jjwt:0.9.0"

    testCompile('org.springframework.boot:spring-boot-starter-test')

    testCompile('io.projectreactor:reactor-test')

    compile('com.fasterxml.jackson.core:jackson-databind')
}

Может ли какой-нибудь эксперт из сообщества Spring помочь вв связи с этим, когда я схожу с ума, чтобы решить эту загадку?

Кроме того, что заставляет браузер вечно ждать ответа при вызове блока?

...