Вызовите асинхронную службу, которая возвращает DeferredResults, несколько раз без увеличения времени выполнения - PullRequest
0 голосов
/ 27 мая 2018

мои приложения должны иметь 2 основных конечных точки: push , pull для отправки и выборки данных.

Операция извлечения должна работать асинхронно и приводить к результату DeferredResult.Когда пользователь вызывает услугу извлечения из состояния покоя, новый DefferedResult создается и сохраняется в Map<Long, DefferedResult> results = new ConcurrentHashMap<>(), где ожидает новые данные или пока не истечет время ожидания.

Выдвиньте пользователя вызова операции поверх отдыха, и эта операция проверяет картурезультатов для получателя данных, выдвинутых этой операцией.Когда карта содержит результат получателя, эти данные устанавливаются на его результат, возвращается DefferedResult.

Вот базовый код:

@Service
public class FooServiceImpl {
    Map<Long, DefferedResult> results = new ConcurrentHashMap<>();

    @Transactional
    @Override
    public DeferredResult<String> pull(Long userId) {
        // here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
        DeferredResult<String> newResult = new DeferredResult<>(5000L);
        results.putIfAbsent(userId, newResult);
        newResult.onCompletion(() -> results.remove(userId));

        // if (data != null)
        //      newResult.setResult(data);

        return newResult;
    }

    @Transactional
    @Override
    public void push(String data, Long recipientId) {
        // fooRepository.save(data, recipientId);
        if (results.containsKey(recipientId)) {
            results.get(recipientId).setResult(data);
        }
    }
}

Код работает, как я ожидал, проблема заключается в том, что следует такжеработает для нескольких пользователей.Я предполагаю, что максимальное количество активных пользователей, которые будут вызывать операцию вытягивания, будет максимум 1000. Поэтому каждый вызов вытягивания занимает максимум 5 секунд, как я установил в DefferedResult, но это не так.

Как вы можете видеть на изображении, если яНемедленно вызовите оставшуюся операцию извлечения из моего клиента javascript, и вы увидите, что задачи будут выполняться последовательно, а не одновременно.Задачи, которые я выполнял в последнюю очередь, занимают около 25 секунд, но мне нужно, чтобы, когда 1000 пользователей одновременно выполняли операцию вытягивания, эта операция занимала максимум 5 секунд + задержка.

enter image description here

Как настроить мое приложение для одновременного выполнения этих задач и обеспечить выполнение каждой задачи примерно 5 секунд или меньше (когда другой пользователь отправляет что-то ожидающему пользователю)?Я попытался добавить эту конфигурацию в файл свойств:

server.tomcat.max-threads=1000

, а также эту конфигурацию:

@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {

    @Override
    protected AsyncTaskExecutor getTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1000);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

Но это не помогло, все тот же результат.Можете ли вы помочь мне настроить его, пожалуйста?Спасибо за совет.

РЕДАКТИРОВАТЬ:

Вот как я называю эту услугу с угловой:

this.http.get<any>(this.url, {params})
  .subscribe((data) => {
    console.log('s', data);
  }, (error) => {
    console.log('e', error);
  });

Когда я пытался вызвать ее несколько разс чистым кодом JS, подобным этому:

function httpGet()
{
    var xmlHttp = new XMLHttpRequest();
    xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
    xmlHttp.send( null );
    return xmlHttp.responseText;
}
setInterval(httpGet, 500);

он будет выполнять каждый запрос на запрос намного быстрее (около 7 секунд).Я ожидал, что увеличение вызвано вызовом базы данных в сервис, но это все же лучше, чем 25 сек.Неужели у меня что-то не так с вызовом этой службы в angular?

EDIT 2:

Я попробовал другую форму тестирования, и вместо браузера я использовал jMeter.Я выполняю 100 запросов в 100 потоках, и вот результат:

enter image description here

Как видите, запросы будут продолжаться до 10, а после достижения 50 запросов приложениеисключение выброса:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
    at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause

Я также комментирую код, в котором я использую репозитории, чтобы убедиться, что с базой данных ничего нет, и тот же результат.ТАКЖЕ я устанавливаю уникальный идентификатор пользователя для каждого запроса с классом AtomicLong.

РЕДАКТИРОВАТЬ 3:

Я выясняю, когда я также комментирую @Transactional все работает отлично!Можете ли вы рассказать мне, как настроить транзакции Spring для большого количества операций без увеличения задержки?

Я добавил spring.datasource.maximumPoolSize=1000, чтобы увеличить размер пула, что, я думаю, должно, поэтому единственная проблема заключается в том, как ускорить методы с помощью@ Transactional.

Каждый вызов метода pull аннотируется @Transactional, потому что мне нужно сначала загрузить данные из базы данных и проверить, есть ли новые данные, потому что да, мне не нужно создавать результат отложенного ожидания.Методы push также должны быть аннотацией с @Transaction, потому что там мне нужно сначала сохранить полученные данные в базе данных, а затем установить это значение для ожидающих результатов.Для своих данных я использую Postgres.

Ответы [ 3 ]

0 голосов
/ 01 июня 2018

Как уже упоминали многие ребята, это неправильный способ проверить производительность.Вы просили делать автоматические запросы в определенный период времени, как вы делали в XMLHttpRequest.Вы можете использовать interval из Observable как:

import {Observable} из "rxjs / Observable";

import {Subscription} из "rxjs / Subscription";

private _intervalSubscription: Subscription;

ngOnInit() {
    this._intervalSubscription = Observable.interval(500).subscribe(x => {
        this.getDataFromServer();
    });
}

ngOnDestroy(): void {
    this._intervalSubscription.unsubscribe();
}

getDataFromServer() {
    // do your network call
    this.http.get<any>(this.url, {params})
                .subscribe((data) => {
                    console.log('s', data);
                }, (error) => {
                    console.log('e', error);
                }); 
}

Это лучший из возможных способов опроса со стороны клиента.

РЕДАКТИРОВАТЬ 1

private prevRequestTime: number;

ngAfterViewInit(): void {
    this.getDataFromServer();
}

getDataFromServer() {
    this.prevRequestTime = Date.now();
    // do your network call
    this.http.get<any>(this.url, {params})
            .subscribe((data) => {
                console.log('s', data);
                this.scheduleRequestAgain();
            }, (error) => {
                console.log('e', error);
                this.scheduleRequestAgain();
            }); 
}

scheduleRequestAgain() {
    let diff = Date.now() - this.prevRequestTime;
    setTimeout(this.getDataFromServer(), diff);
}
0 голосов
/ 01 июня 2018

Кажется, проблема здесь в том, что у вас закончились соединения в пуле базы данных.

Ваш метод помечен @Transaction, но ваш контроллер также ожидает результат метода, то естьDeferredResult должен быть доставлен как можно скорее, так что поток освобождается.

Теперь, вот что происходит при запуске запроса:

  • Функциональность @Transactionреализован в Spring-прокси, который должен открыть соединение, вызвать ваш субъектный метод и затем зафиксировать или откатить транзакцию.
  • Поэтому, когда ваш контроллер вызывает метод fooService.pull, он фактически вызывает прокси.
  • Прокси-сервер должен сначала запросить соединение из пула, затем он вызывает метод службы, который в рамках этой транзакции выполняет некоторую операцию с базой данных.Наконец, он должен зафиксировать или откатить транзакцию и, наконец, вернуть соединение с пулом.
  • После всего этого ваш метод возвращает DeferredResult, который затем передается контроллеру для возврата.

Теперь проблема в том, что DeferredResult разработан таким образом, что его следует использовать асинхронно.Другими словами, обещание, как ожидается, будет разрешено позже в каком-то другом потоке, и мы должны освободить поток запроса как можно скорее.

Фактически, документация Spring для DeferredResult говорит:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = new DeferredResult<String>();
    // Save the deferredResult somewhere..
    return deferredResult;
}

// From some other thread...
deferredResult.setResult(data);

Проблема в вашем коде заключается именно в том, что DeferredResult решается в том же потоке запросов.

Итак, дело в том, что когда прокси Spring запрашиваетПри подключении к пулу базы данных, когда вы выполняете тесты с высокой нагрузкой, многие запросы обнаружат, что пул заполнен и нет доступных подключений.Таким образом, запрос приостановлен, но на данный момент ваш DeferredResult еще не создан, поэтому его функции тайм-аута не существует.

Ваш запрос в основном ожидает, чтобы какое-то соединение из пула базы данных сталоимеется в наличии.Итак, допустим, прошло 5 секунд, затем запрос получает соединение, и теперь вы получаете DeferredResult, который контроллер использует для обработки ответа.В конце концов, через 5 секунд тайм-аут.Поэтому вам нужно добавить время ожидания соединения из пула и время ожидания разрешения DeferredResult.

Вот почему вы, вероятно, видите, что при тестировании с JMeter время запроса постепенно увеличивается по мере истощения соединений из пула базы данных.

Вы можете включить ведение журнала для пула потоков, добавивследующий файл application.properties:

logging.level.com.zaxxer.hikari=DEBUG

Вы также можете настроить размер пула базы данных и даже добавить некоторую поддержку JMX, чтобы вы могли отслеживать ее из Java Mission Control:

spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true

Используя поддержку JMX, вы сможете увидеть, как истощается пул базы данных.

Хитрость заключается в переносе логики, которая разрешает обещание, в другой поток:

@Override
public DeferredResult pull(Long previousId, String username) {


    DeferredResult result = createPollingResult(previousId, username);

    CompletableFuture.runAsync(() -> {
        //this is where you encapsulate your db transaction
        List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
        if (messages.isEmpty()) {
           pollingResults.putIfAbsent(username, result);
        } else {
           result.setResult(messages);
        }
    });

    return result;
}

Таким образом, ваш DeferredResult немедленно возвращается, и Spring может использовать магию асинхронной обработки запросов, пока он освобождает этот драгоценный поток Tomcat.

0 голосов
/ 01 июня 2018

Я думаю, вам нужна модель структуры производителя и потребителя.Я пишу код для вас.Я надеюсь, что это поможет вам.

Это пример кода:

DeferredResultStrore

@Component
public class DeferredResultStrore {

    private Queue<DeferredResult<String>> responseBodyQueue;
    private HashMap<String, List<DeferredResult<InterfaceModel>>> groupMap;
    private final long resultTimeOut;

    public DeferredResultStrore() {
        responseBodyQueue = new LinkedBlockingQueue<DeferredResult<String>>();
        groupMap = new HashMap<String, List<DeferredResult<InterfaceModel>>>();
        // write time.
        resultTimeOut = 1000 * 60 * 60;
    }

    public Queue<DeferredResult<String>> getResponseBodyQueue() {
        return responseBodyQueue;
    }

    public HashMap<String, List<DeferredResult<InterfaceModel>>> getGroupMap() {
        return groupMap;
    }

    public long getResultTimeOut() {
        return resultTimeOut;
    }

}

DeferredResultService

public interface DeferredResultService {

    public DeferredResult<?> biteResponse(HttpServletResponse resp, HttpServletRequest req);

    public DeferredResult<?> biteGroupResponse(String key, HttpServletResponse resp);

}

DeferredResultServiceImpl

@Service
public class DeferredResultServiceImpl implements DeferredResultService {

    @Autowired
    private DeferredResultStrore deferredResultStore;

    @Override
    public DeferredResult<?> biteResponse(final HttpServletResponse resp, HttpServletRequest req) {

        final DeferredResult<String> defResult = new DeferredResult<String>(deferredResultStore.getResultTimeOut());

        removeObserver(resp, defResult, null);

        deferredResultStore.getResponseBodyQueue().add(defResult);

        return defResult;
    }

    @Override
    public DeferredResult<?> biteGroupResponse(String key, final HttpServletResponse resp) {

        final DeferredResult<InterfaceModel> defResult = new DeferredResult<InterfaceModel>(
                deferredResultStore.getResultTimeOut());

        List<DeferredResult<InterfaceModel>> defResultList = null;

        removeObserver(resp, defResult, key);

        if (deferredResultStore.getGroupMap().containsKey(key)) {

            defResultList = deferredResultStore.getGroupMap().get(key);
            defResultList.add(defResult);

        } else {

            defResultList = new ArrayList<DeferredResult<InterfaceModel>>();
            defResultList.add(defResult);
            deferredResultStore.getGroupMap().put(key, defResultList);

        }

        return defResult;
    }

    private void removeObserver(final HttpServletResponse resp, final DeferredResult<?> defResult, final String key) {

        defResult.onCompletion(new Runnable() {
            public void run() {
                if (key != null) {
                    List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);

                    if (defResultList != null) {
                        for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                            if (deferredResult.hashCode() == defResult.hashCode()) {
                                defResultList.remove(deferredResult);
                            }
                        }
                    }

                } else {
                    if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
                        deferredResultStore.getResponseBodyQueue().remove(defResult);
                    }
                }
            }
        });

        defResult.onTimeout(new Runnable() {
            public void run() {
                // 206
                resp.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);

                if (key != null) {
                    List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);

                    if (defResultList != null) {
                        for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                            if (deferredResult.hashCode() == defResult.hashCode()) {

                                InterfaceModel model = new InterfaceModel();
                                model.setId(key);
                                model.setMessage("onTimeout");

                                deferredResult.setErrorResult(model);
                                defResultList.remove(deferredResult);
                            }
                        }
                    }

                } else {
                    defResult.setErrorResult("onTimeout");
                    deferredResultStore.getResponseBodyQueue().remove(defResult);
                }
            }
        });
    }

}

PushService

public interface PushService {

    public boolean pushMessage(String message);

    public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp);

}

PushServiceImpl

@Service
public class PushServiceImpl implements PushService {

    @Autowired
    private DeferredResultStrore deferredResultStore;

    @Override
    public boolean pushMessage(String message) {

        if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {

            for (DeferredResult<String> deferredResult : deferredResultStore.getResponseBodyQueue()) {

                deferredResult.setResult(message);
            }

            deferredResultStore.getResponseBodyQueue().remove();
        }

        return true;
    }

    @Override
    public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp) {
        List<DeferredResult<InterfaceModel>> defResultList = null;

        // select data in DB. that is sample group push service. need to connect db.
        InterfaceModel model = new InterfaceModel();
        model.setMessage("write group message.");
        model.setId(key);

        if (deferredResultStore.getGroupMap().containsKey(key)) {
            defResultList = deferredResultStore.getGroupMap().get(key);

            for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                deferredResult.setResult(model);
            }

            deferredResultStore.getGroupMap().remove(key);
        }

        return true;
    }

}

InterfaceModel

public class InterfaceModel {

    private String message;

    private int idx;
    private String id;

    // DB Column

    public InterfaceModel() {
        // TODO Auto-generated constructor stub
    }

    public InterfaceModel(String message, int idx, String id) {
        this.message = message;
        this.idx = idx;
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public int getIdx() {
        return idx;
    }

    public void setIdx(int idx) {
        this.idx = idx;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}

web.xml

Асинхронная поддержка очень важна в настройках.

<servlet>
    <servlet-name>appServlet</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
</servlet>

Java base

@Bean
public ServletRegistrationBean dispatcherServlet() {
    ServletRegistrationBean registration = new ServletRegistrationBean(
            new DispatcherServlet(), "/");
    registration.setAsyncSupported(true);
    return registration;
}

Фактически:

DeferredResult связан с открытым запросом.Когда запрос завершается, DeferredResult удаляется с карты, а затем клиент отправляет новый длинный запрос опроса, который добавляет новый экземпляр DeferredResult


Spring Boot автоматически регистрирует любой сервлетbean-компоненты в контексте вашего приложения с контейнером сервлета.По умолчанию для поддерживаемой асинхронности установлено значение true, поэтому вам нечего делать, кроме создания бина для вашего сервлета.

@ Aligtor, для вас => public @interface EnableAsync Включает асинхронный метод Springвозможность выполнения, аналогичная функциональности, найденной в пространстве имен Spring в XML.

...