В настоящее время я работаю над курсом по распределенным системам в python и javascript и пытаюсь использовать некоторую отказоустойчивость в моем коде, в частности QoS 1 и схему автоматического выключателя. У меня есть 3 компонента, и они отправляют сообщения, публикуя и подписываясь на темы, используя Paho-MQTT. Первый компонент генерирует и публикует случайные координаты отправления и назначения, второй компонент вызывает внешний API для получения информации о путешествии для этих координат и других функций фильтра, а третий визуализирует это.
Моя проблема в том, что третий после 19 сообщений компонент прекращает получать больше сообщений, а затем переходит к получению дубликатов для всех 19, если пороговое значение автоматического выключателя установлено на 5. Я предполагаю, что это как-то связано с подтверждениями PUBACK для QoS 1.
Вопрос
Если я изменю пороговое значение автоматического выключателя на 2, то он работает отлично, без остановки и отправки только дубликатов. Кто-нибудь знает, почему это происходит?
Код
Во втором компоненте я использовал автоматический выключатель для отбрасывания входящих сообщений, когда очередь заполнена с порогом сбоя 5. Клиент Paho в этом компоненте подписывается на темы с QoS 0:
from queue import Queue
from circuitbreaker import circuit
queue = Queue(50)
def on_connect(client, userdata, flags, rc):
client.subscribe([(subscribe_topic, 0), (subscribe_topic_external, 0)])
@circuit(failure_threshold=5, recovery_timeout=15)
def on_message(client, userdata, msg):
message = json.loads(msg.payload)
queue.put(message, block=True, timeout=1)
После того, как данные проходят через API и другие функции фильтрации, они публикуются с использованием QoS 1:
while True:
if not queue.empty():
trip_request = queue.get()
trip = pipe.transfer_data(trip_request)
client.publish(publish_topic, json.dumps(trip), qos=1)
Третий компонент затем подписывается также с использованием QoS 1 и обрабатывает дубликаты в сообщении on:
client.on('connect', () => {
console.log("connected");
client.subscribe(topic, {qos: 1});
})
client.on('message', (topic, message, packet) => {
let tripRequest = JSON.parse(message.toString())
let requestId = tripRequest.requestId
if (tripSet.has(requestId)) {
console.log("Duplicate trip request found. Request ID: " + requestId)
} else {
tripSet.add(requestId)
queue.push(JSON.parse(message.toString()));
}
});