Это приложение получает и пересылает сообщения из событий базы данных в клиентские приложения.Сообщения немедленно доставляются, когда в браузере клиента есть сеанс веб-сокета.
Однако, когда сеанс веб-сокета не существует и JMSProducer отправляет сообщение в пункт назначения «jms / messagesQueue» в QueueSenderSessionBean, сообщениенемедленно потребляется в NotificationEndpoint.Это не мое намерение.
Мое намерение состоит в том, чтобы очередь сохраняла сообщение до тех пор, пока пользователь не подключится к NotificationEndpoint.Если пользователь не подключен к NotificationEndpoint, я думаю, что не должно быть никакого экземпляра NotificationEndpoint, созданного для получения сообщения.
Как отложить JMSConsumer, потребляющий сообщение из очереди?
Обзор - проект TomEE Plus 8.0.0-M1
- Приложение получает уведомление в NotificationServlet HttpServletRequest
- Строковое сообщение помещается в очередь JMS с помощью QueueSenderSessionBean, вводимого в NotificationServlet
- NotificationMessageDrivenBean реализует MessageListener для прослушивания очереди JMS.
- Событие, аннотированное @NotificationServletJMSMessage, запускается из NotificationMessageDrivenBean для наблюдателя в методе NotificationEndpoint onJMSMessage.сообщение пользователю
- В PushContext.send, если какие-либо сеансы websocket со свойством uuid пользователя соответствуют сообщению user uuid propeВообще-то, сообщение доставляется в каждый сеанс websocket.
Насколько я понимаю, @ServerEndpoint заключается в том, что «каждый новый сеанс WS получает свой экземпляр». Уведомлять только определенных пользователей через WebSockets, когда что-то изменяется в базе данных
Источники: вышеуказанная ссылка из https://stackoverflow.com/users/157882/balusc и https://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2
WEB-INF / resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<resources>
<Resource id="jmsConnectionFactory" type="javax.jms.ConnectionFactory">
connectionMaxIdleTime = 15 Minutes
connectionMaxWaitTime = 5 seconds
poolMaxSize = 10
poolMinSize = 0
resourceAdapter = Default JMS Resource Adapter
transactionSupport = xa
</Resource>
</resources>
NotificationServlet.java
import java.io.IOException;
import java.util.UUID;
import javax.annotation.Resource;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/notifications")
public class NotificationServlet extends HttpServlet
{
@Resource(name = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
private QueueSenderSessionBean _queueSessionSenderBean;
@Override
protected void doGet(HttpServletRequest request,
HttpServletResponse response)
throws ServletException,
IOException
{
try
{
String notificationJson =
extractNotificationJson(request);
if (notificationJson != null)
{
_queueSessionSenderBean.sendMessage(
"notification="
+ notificationJson);
}
}
catch (Exception e)
{
e.printStackTrace();
// handle exception
}
}
public String extractNotificationJson(HttpServletRequest request)
throws IOException
{
if(request.getParameter("notification") != null)
{
String[] notificationString =
request.getParameterValues("notification");
return notificationString[0];
}
return null;
}
}
QueueSenderSessionBean.java
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.DeliveryMode;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import org.json.JSONObject;
@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
@JMSConnectionFactory("jmsConnectionFactory")
private JMSContext _jmsContext;
// Static Methods
// Member Methods
public void sendMessage(String message)
{
try
{
JMSProducer messageProducer =
_jmsContext.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
String userProperty = "someValue";
TextMessage textMessage = _jmsContext.createTextMessage(message);
textMessage.setStringProperty("userProperty", userProperty);
messageProducer.send(_notificationQueue, textMessage);
}
catch (JMSException e)
{
e.printStackTrace();
// handle jms exception
}
}
}
Классификатор уведомленийServletJMSMessage.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface NotificationServletJMSMessage
{
}
NotificationMessageDrivenBean.java
import javax.ejb.MessageDriven;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.Message;
import javax.jms.MessageListener;
@Named
@MessageDriven(mappedName = "jms/notificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
@Inject
@NotificationServletJMSMessage
Event<Message> jmsEvent;
@Override
public void onMessage(Message message)
{
jmsEvent.fire(message);
}
}
PushContext.java
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.websocket.Session;
@ApplicationScoped
public class PushContext
{
@Inject
private JMSContext _jmsContext;
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
private Map<String, Set<Session>> _sessions;
@PostConstruct
public void init()
{
_sessions = new ConcurrentHashMap<>();
}
public void add(Session session, String userUuid)
{
_sessions.computeIfAbsent(userUuid,
value -> ConcurrentHashMap.newKeySet()).add(session);
}
void remove(Session session)
{
_sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
}
public void send(Set<String> userUuids, Message message) throws JMSException
{
String userUuid = message.getStringProperty("userUuid");
userUuids.add(userUuid);
Set<Session> userSessions;
synchronized(_sessions)
{
userSessions = _sessions.entrySet().stream()
.filter(e -> userUuids.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
}
for (Session userSession : userSessions)
{
if (userSession.isOpen())
{
userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
}
}
}
public void removeSession(Session session)
{
String userUuid = (String)session.getUserProperties().get("userUuid");
_sessions.remove(userUuid, session);
}
}
NotificationEndpoint.java
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint
{
private static final Set<Session> SESSIONS =
Collections.synchronizedSet(new HashSet<Session>());
private QueueSenderSessionBean _senderBean;
@Inject
private PushContext _pushContext;
@Inject
public NotificationEndpoint(QueueSenderSessionBean senderBean)
{
_senderBean = senderBean;
}
@OnOpen
public void onOpen(Session session,
EndpointConfig configurator,
@PathParam(value = "tokenId") String userUuidString)
{
session.getUserProperties().put("userUuid", userUuidString);
_pushContext.add(session, userUuidString);
}
@OnMessage
public void onMessage(String message, Session session)
throws IOException
{
System.out.println("Message received: " + message);
_senderBean.sendMessage(message);
}
@OnClose
public void onClose(CloseReason reason, Session session)
{
System.out.println(
"Closing 'notificatioEndpoint due to "
+ reason.getReasonPhrase());
try
{
session.close();
}
catch (IOException e)
{
e.printStackTrace();
}
_pushContext.removeSession(session);
}
@OnError
public void error(Session session, Throwable t)
{
t.printStackTrace();
}
public static void sendToAllClients(String message)
{
synchronized (SESSIONS)
{
for (Session session : SESSIONS)
{
if (session.isOpen())
{
session.getAsyncRemote().sendText(message);
}
}
}
}
public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message)
{
Set<String> userUuids = new HashSet<String>();
try
{
_pushContext.send(userUuids, message);
}
catch (JMSException ex)
{
ex.printStackTrace();
Logger.getLogger(NotificationEndpoint.class.getName()).
log(Level.SEVERE, null, ex);
}
}
}
Спасибо, Тед С