java-nats-streaming: публикация сообщений после переподключения сервера - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть потоковый кластер NATS с настроенными 3 узлами. Похоже, что сообщения NATS, публикуемые моим java-приложением во время простоя сервера, теряются (т.е. не переиздаются, когда мои серверы снова работают и работают).

Более подробное описание:

  1. Кластер NATS онлайн. Приложения издателя и подписчика приходят в онлайн. Издатель начинает публиковать сообщения каждую секунду. Подписчик получает сообщения.
  2. Серверы NATS отключены. Издатель продолжает публиковать сообщения (назовем эти сообщения «автономными сообщениями»). Абонент перестает получать что-либо
  3. Серверы NATS возвращаются в оперативный режим. Подписчик снова начинает получать сообщения, но «автономные сообщения» не принимаются.

Мои приложения издателя и подписчика настроены на попытку переподключения к серверу NATS и не имеют тайм-аута. Я не получаю никаких исключений во всем.

NATS-соединение:

Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();

Connection nc = Nats.connect(options);

StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();

Издатель:

// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());

Абонент:

streamingConnection.subscribe(subject, new MessageHandler() {
    public void onMessage(Message m) {
        System.out.prinf("Received msg: %s\n", m.getData())
    }
},  new SubscriptionOptions.Builder().durableName(durableName).build());

Из документов клиент Java NATS, похоже, имеет встроенный буфер переподключения. Я попытался увеличить буфер в 10 раз, но безрезультатно (также мои сообщения состоят только из 2-значных чисел). Как я могу заставить его отправить эти «автономные сообщения»?

1 Ответ

0 голосов
/ 14 февраля 2019

У меня та же проблема, единственное решение, которое я вижу, что занят другой метод подписки, сохраните последовательность сообщений, но это, я не думаю, является лучшим

   // Receive messages starting at a specific sequence number
   sc.subscribe("foo", new MessageHandler() {
   public void onMessage(Message m) {
     logger.info("Sequence message " +  m.getSequence());
     System.out.printf("Received a message: %s\n", m.getData());
   }
   }, new SubscriptionOptions.Builder().startAtSequence(22).build());
...