Отправить большой набор данных с помощью Undertow WebSockets эффективно - PullRequest
0 голосов
/ 06 ноября 2019

У меня есть большой ConcurrentHashMap (cache.getCache()), где я храню все свои данные (размер около 500 МБ, но со временем он может увеличиться). Это доступно для клиентов через API, реализованный с использованием простого Java HttpServer. Вот упрощенный код:

JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);

Есть также некоторые фильтры, которые отправляют клиенты, чтобы они фактически не получали все данные каждый раз, но HashMap постоянно обновляется, поэтому клиентам приходится часто обновлять, чтобы иметь последниеданные. Это неэффективно, поэтому я решил отправить обновления данных клиентам в режиме реального времени с помощью WebSockets.

Я выбрал Undertow для этого, потому что я могу просто импортировать его из Maven, и мне не нужно выполнять никаких дополнительных настроексервер.

При подключении WS я добавляю канал в HashSet и отправляю весь набор данных (клиент отправляет сообщение с некоторыми фильтрами перед получением исходных данных, но я удалил эту часть из примера):

public class MyConnectionCallback implements WebSocketConnectionCallback {
  CacheContainer cache;
  Set<WebSocketChannel> clients = new HashSet<>();
  BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public MyConnectionCallback(CacheContainer cache) {
    this.cache = cache;
    Thread pusherThread = new Thread(() -> {
      while (true) {
        push(queue.take());
      }
    });
    pusherThread.start();
  }

  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
        clients.add(webSocketChannel);
        WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
      }
    }
  }

  private void push(String message) {
    Set<WebSocketChannel> closed = new HashSet<>();
    clients.forEach((webSocketChannel) -> {
        if (webSocketChannel.isOpen()) {
            WebSockets.sendText(message, webSocketChannel, null);
        } else {
            closed.add(webSocketChannel);
        }
    }
    closed.foreach(clients::remove);
  }

  public void putMessage(String message) {
    queue.put(message);
  }
}

После каждого изменения в моем кэше я получаю новое значение и помещаю его в очередь (я не сериализую напрямую объект myUpdate, потому что в методе updateCache есть другая логика). За обновление кэша отвечает только один поток:

cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));

Проблемы, с которыми я сталкиваюсь при таком подходе:

  1. при первоначальном подключении весь набор данных преобразуется в строку, и яопасайтесь, что слишком много запросов может привести к тому, что сервер станет OOM. WebSockets.sendText принимает только String и ByteBuffer
  2. , если я добавлю канал к клиенту, установленному первым, а затем отправлю данные, отправка может быть передана клиенту до отправки исходных данных, и клиент будетнедопустимое состояние
  3. , если я сначала отправлю исходные данные, а затем добавлю канал в набор клиентов, push-сообщения, поступающие во время отправки исходных данных, будут потеряны, а клиент окажется в недопустимом состоянии

Решение, которое я нашел для задач № 2 и № 3, состоит в том, чтобы поместить сообщения в очередь (я бы преобразовал Set<WebSocketChannel> в Map<WebSocketChannel,Queue<String>> и отправлял сообщения в очереди только послеклиент получает начальный набор данных, но я приветствую любые другие предложения здесь.

Что касается проблемы # 1, мой вопрос заключается в том, какой будет наиболее эффективный способ отправки исходных данных через WebSocket? Например, что-то вроде написания сJsonWriter непосредственно в WebSocket.

Я понимаю, что клиенты могут сделать первоначальный вызов, используя API и подписаться на WebSocket для изменений, но этот подход заставляет клиентов отвечать за правильное состояние (им необходимо подписаться на WS, поставить в очередь сообщения WS, получить исходные данные с помощью API, а затем применить сообщения WS в очереди к своему набору данных после полученияисходные данные), и я не хочу оставлять это за ними, потому что данные чувствительны.

Ответы [ 2 ]

1 голос
/ 06 ноября 2019

Кажется, проблема № 2 и № 3 связана с тем, что разные потоки могут одновременно отправлять состояние данных клиенту. Таким образом, в дополнение к вашему подходу, вы можете рассмотреть два других подхода к синхронизации.

  1. использовать мьютекс для защиты доступа к данным и отправки клиентом. Это сериализует чтение и отправку данных клиентам, поэтому (псевдо) код становится таким:
protected void onFullTextMessage(...) {
   LOCK {
     clients.add(webSocketChannel);
     WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
   }
}

void push(String message) {
    Set<WebSocketChannel> closed = new HashSet<>();
    LOCK {
      clients.forEach((webSocketChannel) -> {
          if (webSocketChannel.isOpen()) {
              WebSockets.sendText(message, webSocketChannel, null);
          } else {
              closed.add(webSocketChannel);
          }
      }
    }
    closed.foreach(clients::remove);
}
создать новый поток класса и службы, который несет единоличную ответственность за управление изменениями в кэше данных и передачу этих изменений клиентам;он будет использовать внутреннюю синхронизированную очередь для асинхронной обработки вызовов методов, а также отслеживает подключенных клиентов, у него будет такой интерфейс:
public void update_cache(....);
public void add_new_client(WebSocketChannel);

... каждый из этих вызовов запрашиваетоперация, которая должна быть завершена во внутреннем потоке объекта. Это гарантирует упорядочение исходного снимка и обновлений, потому что только один поток выполняет работу по изменению кэша и распространению этих изменений среди подписчиков.

Что касается # 1, если вы использовали подход # 2, то вы могли быкэшируйте сериализованное состояние ваших данных, что позволяет повторно использовать их на более поздних снимках (при условии, что они не были изменены за это время). Как отмечено в комментарии: это будет работать только в том случае, если у более поздних клиентов будет такая же конфигурация фильтра.

0 голосов
/ 11 ноября 2019

Для решения проблем № 2 и № 3 я установил флаг принудительной блокировки на каждом клиенте, который разблокируется только при отправке исходных данных. Когда принудительная блокировка установлена, поступающие сообщения помещаются в очередь этих клиентов. Сообщения в очереди затем отправляются перед любыми новыми сообщениями.

Я уменьшил проблему # 1, используя ByteBuffer напрямую вместо String. Таким образом, я могу сэкономить память из-за кодировки (String использует UTF-16 по умолчанию)

Окончательный код:

public class WebSocketClient {
  private boolean pushLock;
  private Gson gson;
  private Queue<CacheContainer> queue = new ConcurrentLinkedQueue<>();

  WebSocketClient(MyQuery query, CacheHandler cacheHandler) {
    pushLock = true;
    this.gson = GsonFactory.getGson(query, cacheHandler);
  }

  public synchronized boolean isPushLock() {
    return pushLock;
  }

  public synchronized void pushUnlock() {
    pushLock = false;
  }

  public Gson getGson() {
    return gson;
  }

  public Queue<CacheContainer> getQueue() {
    return queue;
  }

  public boolean hasBackLog() {
    return !queue.isEmpty();
  }
}

public class MyConnectionCallback implements WebSocketConnectionCallback {

  private final Map<WebSocketChannel, WebSocketClient> clients = new ConcurrentHashMap<>();
  private final BlockingQueue<CacheContainer> messageQueue = new LinkedBlockingQueue<>();

  private final Gson queryGson = new GsonBuilder().disableHtmlEscaping().create();

  private final CacheHandler cacheHandler;

  MyConnectionCallback(CacheHandler cacheHandler) {
    this.cacheHandler = cacheHandler;
    Thread pusherThread = new Thread(() -> {
      boolean hasPushLock = false;
      while (true) {
        if (messageQueue.isEmpty() && hasPushLock) hasPushLock = pushToAllClients(null);
        else hasPushLock = pushToAllClients(messageQueue.take());
      }
    }, "PusherThread");
    pusherThread.start();
  }

  @Override
  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
      @Override
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException {
        MyQuery query = new MyQuery(queryGson.fromJson(message.getData(), QueryJson.class));
        WebSocketClient clientConfig = new WebSocketClient(query, cacheHandler);
        clients.put(webSocketChannel, clientConfig);
        push(webSocketChannel, clientConfig.getGson(), cacheHandler.getCache());
        clientConfig.pushUnlock();
        }
    });
    webSocketChannel.resumeReceives();
  }

  void putMessage(CacheContainer message) {
    messageQueue.put(message);
  }

  private synchronized void push(WebSocketChannel webSocketChannel, Gson gson, CacheContainer message) throws IOException {
    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
      JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8))) {
      gson.toJson(message, CacheContainer.class, jsonWriter);
      jsonWriter.flush();
      if (baos.size() > 2) {
        WebSockets.sendText(ByteBuffer.wrap(baos.toByteArray()), webSocketChannel, null);
      }
    }
  }

  private synchronized boolean pushToAllClients(CacheContainer message) {
    AtomicBoolean hadPushLock = new AtomicBoolean(false);
    Set<WebSocketChannel> closed = new HashSet<>();

    clients.forEach((webSocketChannel, clientConfig) -> {
      if (webSocketChannel.isOpen()) {
        if (clientConfig.isPushLock()) {
          hadPushLock.set(true);
          clientConfig.getQueue().add(message);
        } else {
          try {
            if (clientConfig.hasBackLog())
              pushBackLog(webSocketChannel, clientConfig);
            if (message != null)
              push(webSocketChannel, clientConfig.getGson(), message);
          } catch (Exception e) {
            closeChannel(webSocketChannel, closed);
          }
        }
      } else {
        closed.add(webSocketChannel);
      }
    });

    closed.forEach(clients::remove);
    return hadPushLock.get();
  }

  private void pushBackLog(WebSocketChannel webSocketChannel, WebSocketClient clientConfig) throws IOException {
    while (clientConfig.hasBackLog()) {
      push(webSocketChannel, clientConfig.getGson(), clientConfig.getQueue().poll());
    }
  }

  private void closeChannel(WebSocketChannel channel, Set<WebSocketChannel> closed) {
    closed.add(channel);
    channel.close();
  }
}
...