У меня есть большой ConcurrentHashMap (cache.getCache()
), где я храню все свои данные (размер около 500 МБ, но со временем он может увеличиться). Это доступно для клиентов через API, реализованный с использованием простого Java HttpServer. Вот упрощенный код:
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);
Есть также некоторые фильтры, которые отправляют клиенты, чтобы они фактически не получали все данные каждый раз, но HashMap постоянно обновляется, поэтому клиентам приходится часто обновлять, чтобы иметь последниеданные. Это неэффективно, поэтому я решил отправить обновления данных клиентам в режиме реального времени с помощью WebSockets.
Я выбрал Undertow для этого, потому что я могу просто импортировать его из Maven, и мне не нужно выполнять никаких дополнительных настроексервер.
При подключении WS я добавляю канал в HashSet и отправляю весь набор данных (клиент отправляет сообщение с некоторыми фильтрами перед получением исходных данных, но я удалил эту часть из примера):
public class MyConnectionCallback implements WebSocketConnectionCallback {
CacheContainer cache;
Set<WebSocketChannel> clients = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyConnectionCallback(CacheContainer cache) {
this.cache = cache;
Thread pusherThread = new Thread(() -> {
while (true) {
push(queue.take());
}
});
pusherThread.start();
}
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
}
private void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
closed.foreach(clients::remove);
}
public void putMessage(String message) {
queue.put(message);
}
}
После каждого изменения в моем кэше я получаю новое значение и помещаю его в очередь (я не сериализую напрямую объект myUpdate
, потому что в методе updateCache есть другая логика). За обновление кэша отвечает только один поток:
cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));
Проблемы, с которыми я сталкиваюсь при таком подходе:
- при первоначальном подключении весь набор данных преобразуется в строку, и яопасайтесь, что слишком много запросов может привести к тому, что сервер станет OOM. WebSockets.sendText принимает только String и ByteBuffer
- , если я добавлю канал к клиенту, установленному первым, а затем отправлю данные, отправка может быть передана клиенту до отправки исходных данных, и клиент будетнедопустимое состояние
- , если я сначала отправлю исходные данные, а затем добавлю канал в набор клиентов, push-сообщения, поступающие во время отправки исходных данных, будут потеряны, а клиент окажется в недопустимом состоянии
Решение, которое я нашел для задач № 2 и № 3, состоит в том, чтобы поместить сообщения в очередь (я бы преобразовал Set<WebSocketChannel>
в Map<WebSocketChannel,Queue<String>>
и отправлял сообщения в очереди только послеклиент получает начальный набор данных, но я приветствую любые другие предложения здесь.
Что касается проблемы # 1, мой вопрос заключается в том, какой будет наиболее эффективный способ отправки исходных данных через WebSocket? Например, что-то вроде написания сJsonWriter непосредственно в WebSocket.
Я понимаю, что клиенты могут сделать первоначальный вызов, используя API и подписаться на WebSocket для изменений, но этот подход заставляет клиентов отвечать за правильное состояние (им необходимо подписаться на WS, поставить в очередь сообщения WS, получить исходные данные с помощью API, а затем применить сообщения WS в очереди к своему набору данных после полученияисходные данные), и я не хочу оставлять это за ними, потому что данные чувствительны.