Асинхронный базовый локальный с MQTT - PullRequest
0 голосов
/ 29 марта 2019

Мне нужно синхронизировать базовый и локальный клиент с MQTT. Если клиент публикует, другой получит сообщение.

  • Если мой MQTT-брокер не работает, мне нужно прекратить отправку сообщений, сохранить их где-нибудь, дождаться соединения и продолжить отправку.

  • Если мой локальный или базовый клиент не работает в течение секунды, мне нужно сохранить сообщение, которое я не отправил, а затем отправить его, когда я включу свой базовый / локальный.

Я работаю с Node.js и не могу понять, как это реализовать.

Это мой обработчик, когда я подключаюсь или отключаюсь от моего сервера MQTT.

    client.on('connect',()=>{
      store.state = true;
      run(store).then((value)=>console.log('stop run'));
    });

    client.on('offline',()=>{
      store.state = false;
      console.log('offline');
    });

Это моя run функция. Я использую store.state, чтобы решить, должен ли я остановить этот интервал. Но этот код не кажется хорошим способом реализации моей концепции.

function run(store) {
  return new Promise((resolve,reject)=>{
    let interval = setInterval(()=>{
      if (!store.state) {
        clearInterval(interval);
        resolve(true);
      }
      else if (store.queue.length > 0) {
        let data = store.queue.pop();
        let res = client.publish('push',JSON.stringify(data),{qos:2});
      }
    },300)
  });
}

Что я должен сделать, чтобы реализовать функцию, которая всегда отправляет, останавливается при «отключении», а затем продолжает отправку при подключении?

1 Ответ

0 голосов
/ 02 апреля 2019

Я не думаю, что установить интервал, который 300 мс это хорошо.

Если вы хотите что-то, что «всегда работает», с установленными интервалами и несмотря на любые ошибки внутри цикла, setInterval() имеет смысл. Вы правы в том, что сообщения в очереди можно очистить быстрее, чем «раз в 300 мс».

Поскольку MQTT.js имеет встроенную очередь , вы можете значительно упростить ее использование. Тем не менее, ваши сообщения публикуются для цели, называемой «push», поэтому я думаю, вы хотите, чтобы они доставлялись в порядке очереди. Этот ответ сохраняет очередь и фокусируется на отправке следующего сообщения, как только последнее будет подтверждено.

Что если res = client.publish (..) false?

Хороший вопрос! Если вы хотите убедиться, что он прибыл, лучше удалить его после успешного завершения публикации. Для этого вам нужно извлечь значение, не удаляя его, и использовать аргумент обратного вызова, чтобы выяснить, что произошло (publish() является асинхронным). Если бы это было единственное изменение, оно могло бы выглядеть так:

let data = store.queue[store.queue.length - 1];
client.publish('push', JSON.stringify(data), {qos:2}, (err) => {
  if(!err) {
    store.queue.pop();
  }
  // Ready for next publish; call this function again
});

Расширение, чтобы включить запуск на основе обратного вызова:

function publishFromQueue(data) {
  return new Promise((resolve,reject)=>{
    let res = client.publish('push', JSON.stringify(data), {qos:2}, (err) => {
      resolve(!err);
    });
  });
}

async function run(store) {
  while (store.queue.length > 0 && store.state) {
    let data = store.queue[store.queue.length - 1];
    let res = await publishFromQueue(data);
    if(res) {
      store.queue.pop();
    }
  }
}

Это должно доставить все поставленные в очередь сообщения в порядке, как можно скорее, без блокировки. Единственным недостатком является то, что он не работает постоянно. У вас есть два варианта:

  1. Повторять через заданные интервалы, как вы уже сделали. Медленнее, хотя вы можете установить более короткий интервал.
  2. Только run() при необходимости, например:
let isRunning = false; //Global for tracking state of running

function queueMessage(data) {
  store.queue.push(data);
  if(!isRunning) {
    isRunning = true;
    run(store);
  }
  isRunning = false;
}

Пока вы можете использовать это вместо отправки в очередь, оно должно получаться аналогичной длины, более быстрым и эффективным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...