Почему порог моего автоматического выключателя влияет на QoS 1 в MQTT? - PullRequest
1 голос
/ 10 января 2020

В настоящее время я работаю над курсом по распределенным системам в python и javascript и пытаюсь использовать некоторую отказоустойчивость в моем коде, в частности QoS 1 и схему автоматического выключателя. У меня есть 3 компонента, и они отправляют сообщения, публикуя и подписываясь на темы, используя Paho-MQTT. Первый компонент генерирует и публикует случайные координаты отправления и назначения, второй компонент вызывает внешний API для получения информации о путешествии для этих координат и других функций фильтра, а третий визуализирует это.

Моя проблема в том, что третий после 19 сообщений компонент прекращает получать больше сообщений, а затем переходит к получению дубликатов для всех 19, если пороговое значение автоматического выключателя установлено на 5. Я предполагаю, что это как-то связано с подтверждениями PUBACK для QoS 1.

Вопрос
Если я изменю пороговое значение автоматического выключателя на 2, то он работает отлично, без остановки и отправки только дубликатов. Кто-нибудь знает, почему это происходит?

Код
Во втором компоненте я использовал автоматический выключатель для отбрасывания входящих сообщений, когда очередь заполнена с порогом сбоя 5. Клиент Paho в этом компоненте подписывается на темы с QoS 0:

from queue import Queue
from circuitbreaker import circuit

queue = Queue(50)

def on_connect(client, userdata, flags, rc):
    client.subscribe([(subscribe_topic, 0), (subscribe_topic_external, 0)])

@circuit(failure_threshold=5, recovery_timeout=15)
def on_message(client, userdata, msg):
    message = json.loads(msg.payload)
    queue.put(message, block=True, timeout=1)

После того, как данные проходят через API и другие функции фильтрации, они публикуются с использованием QoS 1:

while True:
    if not queue.empty():
        trip_request = queue.get()
        trip = pipe.transfer_data(trip_request)
        client.publish(publish_topic, json.dumps(trip), qos=1)

Третий компонент затем подписывается также с использованием QoS 1 и обрабатывает дубликаты в сообщении on:

client.on('connect', () => {
  console.log("connected");
  client.subscribe(topic, {qos: 1});
})

client.on('message', (topic, message, packet) => {
  let tripRequest = JSON.parse(message.toString())
  let requestId = tripRequest.requestId
  if (tripSet.has(requestId)) {
    console.log("Duplicate trip request found. Request ID: " + requestId)
  } else {
    tripSet.add(requestId)
    queue.push(JSON.parse(message.toString()));
  }
});
...