Как накапливать сообщения как mqtt-клиент за 1 секунду, а затем сохранять их в файл - PullRequest
0 голосов
/ 07 августа 2020

Моя проблема заключается в следующем: я написал программу, которая подписывается на топи c, где 2 словаря с одним ключом соответственно приходят чаще раз в секунду. При каждом сообщении они меняют свою ценность. Я сохраняю эти словари в большом буферном словаре под названием «Статус». Мне нужно каждую секунду сохранять "снимок" состояния в файл.

Я пробовал time.sleep (1), но он дрейфует. И я не знаю, как решить проблему с расписанием из-за уже существующего client-l oop ...

Я новичок в python и mqtt и буду признателен за вашу помощь

Мой код:

import paho.mqtt.client as mqtt
import time
import json

Status = {}

#create client instance
client = mqtt.Client(client_id=None, clean_session=True, transport="tcp")

#connect to broker
client.connect("my_broker", 1883)

#use subscribe() to subscribe to a topic and receive messages
client.subscribe("topic/#", qos=0)

def test1_callback(client, userdata, msg):
    msg_dict = json.loads((msg.payload))
    Status.update(msg_dict)

client.message_callback_add("topic/test1", test1_callback)

while True:
    client.loop_start()
    time.sleep(1)
    client.loop_stop()

    with open('Data.txt', 'a+') as file:
        t = time.localtime()
        Status["time"]= time.strftime("%H:%M:%S", t)
            file.write(str(Status["time"]) + " ")
            file.write(str(Status["key1"]) + " ")
            file.write(str(Status["key2"]) + " ")

    client.loop_start()

1 Ответ

0 голосов
/ 10 августа 2020

Вместо того, чтобы вручную останавливать сетевой поток, я бы предпочел использовать таймер, который срабатывает каждую секунду. Кроме того, может быть хорошей идеей заблокировать данные при их сохранении в файл - в противном случае может произойти обновление между:

# ...
import threading

def test1_callback(client, userdata, msg):
   msg_dict = json.loads((msg.payload))
   lock.acquire()
   Status.update(msg_dict)
   lock.release()

def timer_event():
   lock.acquire()
   # save to file here
   lock.release()
   # restart timer
   threading.Timer(1, timer_event).start()

Status = {}
lock = threading.Lock()

# client initialization
# ...

client.loop_start()
threading.Timer(1, timer_event).start()

while True:
   pass

Но это не помешает вашему сохраненному значению уйти, потому что topi c очевидно публикуется слишком часто, поэтому ваш подписчик (или даже брокер) не может обработать сообщение достаточно быстро. опубликовано. Также обратите внимание, что вы подписались на многоуровневую топовую c - даже если темы помимо "topic/test1" не обрабатываются в вашем коде, они по-прежнему вызывают нагрузку на брокера и подписавшегося клиента

...