simpMessagingTemplate convertAndSendToUser много ожидающих потоков, блокирующих другие функции - PullRequest
0 голосов
/ 21 января 2019

Мы используем Stomp, SpringBoot и WebSockets в нашем приложении.Серверное приложение выполняет следующие действия: 1) Генерирует сообщения для отправки пользователям, 2) Принимает соединения WebSocket и 3) отправляет сообщения в ActiveMQ Stomp Broker.Дамп потока показывает множество ожидающих потоков, связанных с вызовом API simpMessagingTemplate convertAndSendToUser.

Два экземпляра приложения работают в облаке.Это приложение генерирует сообщения и отправляет их в ActiveMQ Stomp Broker (работает отдельно) с помощью simpMessagingTemplate convertAndSendToUser API.

Мы используем Gatling для моделирования пользовательских подключений WebSocket для нагрузочного тестирования.Гатлинг работает на отдельном экземпляре.Приложение отлично работает на 2000 пользовательских подключений.Как только мы увеличиваем количество пользователей до 4000, мы видим, что поток генерации сообщений останавливается.Пользователи подключаются к одним и тем же серверам без каких-либо проблем.

Если мы прокомментируем вызов API simpMessagingTemplate convertAndSendToUser, то все будет прекрасно (как генерировать сообщения, так и новые соединения WebSocket).Таким образом, мы сомневаемся в проблеме с convertAndSendToUser API.

Трассировка стека Threaddump приведена ниже:

"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
	at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
	at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
	at com.mypackage.PushMessageManager.lambda$sendMyMessage$2(PushMessageManager.java:77)
	at com.mypackage.PushMessageManager$$Lambda$923/1850582969.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
	at com.mypackage.PushMessageManager.lambda$processPushMessage$0(PushMessageManager.java:61)
	at com.mypackage.PushMessageManager$$Lambda$664/624459498.run(Unknown Source)
	at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at nl.talsmasoftware.context.executors.ContextAwareExecutorService$1.call(ContextAwareExecutorService.java:59)
	at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
	- None

Ниже приведены шаги с диаграммой:

  1. Gatling JMS издатель отправляет сообщения JMS со скоростью 20000 сообщений в минуту брокеру Active MQ.Обратите внимание, что эти сообщения предназначены не только для одного пользователя.он распространяется на основе пользовательских подключений WebSocket.
  2. Наше приложение имеет JMS-прослушиватель для получения этих сообщений.Мы запускаем, скажем, 2 экземпляра приложения, поэтому два JMS-слушателя обрабатывают это сообщение.
  3. Как только приложение получает сообщение JMS, оно проверяет информацию о сеансе из кэша, чтобы идентифицировать подключенных пользователей, и отправляет другому брокеру Stomp ActiveMQ.используя simpMessagingTemplate convertAndSendToUser API API simpMessagingTemplate.convertAndSendToUser (sessionId, "/ queue / abc", полезная нагрузка).Обратите внимание, что sessionId хранится в распределенном кэше, когда пользователь впервые подключается к приложению.Таким образом, это действительные идентификаторы сеанса.
  4. ActiveMQ Stomp Broker затем передает эти сообщения отдельным очередям пользователей Stomp.
  5. Клиент Gatling WebSocket (каждый из которых имеет 2000 пользовательских соединений) должен получать эти сообщения через соединения WebSocket.
  6. Клиентское подключение и подписка выглядит следующим образом

    stompClient.connect ({'username': $ ("# # userName"). Val ()}, function (frame) {setConnected (true); subscription = stompClient.subscribe ('/ user / queue / abc', function (message) {showData (JSON.parse (message.body));}, headers = {'loginusername': $ ("#userName "). val ()});});

Таким образом, каждый пользователь должен получать только предназначенные ему сообщения, а не все сообщения.По этой причине мы подключаем пользователей к отдельным очередям при подключении через WebSocket, а также используем convertAndSendToUser для отправки сообщений в определенные сеансы.Внутренний JMS-издатель гарантирует, что сообщения публикуются пользователям циклически.

Чтобы ответить на ваш вопрос относительно определения узкого места, если мы подключимся, скажем, 2000 пользователей, все работает отлично.Но когда мы добавляем больше пользователей, мы видим, что JMS-прослушиватель приложения не может прослушивать 20000 сообщений в минуту, отправляемых внутренним генератором нагрузки Gatling JMS.Из-за этого увеличивается глубина очереди ActiveMQ JMS.

Чтобы убедиться, что узким местом является API convertAndSendToUser, мы прокомментировали этот вызов API.Если мы сделаем это, мы сможем подключить ~ 13k подключений WebSocket, а внутренний слушатель JMS сможет также использовать все 20000 сообщений в минуту.

Надеюсь, это прояснит некоторые ваши вопросы. UPDATE Фрагмент кода для демонстрации асинхронного вызова API simpMessagingTemplate.convertAndSendToUser приведен ниже. Здесь RepositoryUtil.executor () является нашей собственной оболочкой для объекта executor.

    public CompletableFuture<Void> processPushMessage(String userName, String payload) {
    return ContextAwareCompletableFuture.runAsync(() -> {
        sendABCMessage(payload, userName);
    }, RepositoryUtil.executor());
}

public void sendABCMessage(@Payload String payload, String username) {
    ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
    if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
      userProfiles.parallelStream()
          .filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
          .forEach(userProfiles1 -> {              simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
          });
    } else {
      LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
    }
}

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Мы можем решить эту проблему, перейдя в / user / topic вместо / user / queue.Теперь мы можем обрабатывать ~ 35 тыс. Сообщений в минуту из серверных подключений через бэкэнд и 8 тыс. Сокетов.

0 голосов
/ 23 января 2019

Приложение прекрасно работает для 2000 пользовательских соединений с нагрузкой 20 000 сообщений в минуту.Как только мы увеличиваем количество пользователей до 4000, мы видим, что поток создания сообщений останавливается.

Если вы отправляете 20 000 сообщений в ActiveMQ, а каждое сообщение имеет 1000 подписчиков, это означает, что опубликовано 20 000 000 сообщений (1 000 * 20 000).вернуться к клиентам WebSocket.Поэтому постарайтесь определить общий объем проходящих сообщений и понять, где находится узкое место (сервер пересылает сообщения в ActiveMQ, обрабатывает сообщения ActiveMQ или сервер публикует сообщения для клиентов WebSocket).

Для 20000 сообщений они генерируются из одного потока или из большого числа различных потоков, например, в результате обработки сообщений от клиентов WebSocket или HTTP-вызовов REST?Если это последнее, возможно, слишком много потоков пытаются одновременно пересылать сообщения брокеру, и вам, возможно, придется применить какие-то ограничения скорости.

В конце дня вам необходимо понятьобщий объем, где находится узкое место и где применяются некоторые ограничения скорости.

...