Как запустить 2 разных цикла в двух разных потоках? - PullRequest
0 голосов
/ 14 июня 2019

Я работаю с приложением телеметрии, используя Azure IoT Hub, Azure IoT SDK в Python и Raspberry Pi с датчиками температуры и влажности.

Влажность + Датчики температуры => Rasperry Pi => Azure IoT Hub

В моей первой реализации благодаря примерам Azure я использовал один цикл, который собирает данные с датчика температуры и датчика влажности и отправляет их в концентратор IoT Azure в одно и то же время каждые 60 секунд.

>>> 1 Loop every 60s = Collect data & send data of temperature and humidity

Теперь я хотел бы отправить их с разными частотами, я имею в виду: одна петля будет собирать данные датчика температуры и отправлять их в Azure IoT Hub каждые 60 секунд;В то время как второй цикл будет собирать данные датчика влажности и отправлять их в Azure IoT Hub каждые 600 секунд.

>>> 1 Loop every 60s= Collect data & send data of temperature
>>> 2 Loop every 600s= Collect data & send data of humidity 

Я думаю, что мне нужен инструмент многопоточности, но я не понимаю, какойбиблиотека или структура, которую я должен реализовать в моем случае.

Вот код, предоставляемый Azure, включая один цикл, который одновременно обрабатывает температуру и влажность.Чтение данных и отправка в Azure каждые 60 секунд.

import random
import time
import sys

# Using the Python Device SDK for IoT Hub:
from iothub_client import IoTHubClient, IoTHubClientError, 
IoTHubTransportProvider, IoTHubClientResult
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, 
IoTHubError, DeviceMethodReturnValue

# The device connection string to authenticate the device with your IoT hub.
CONNECTION_STRING = "{Your IoT hub device connection string}"

# Using the MQTT protocol.
PROTOCOL = IoTHubTransportProvider.MQTT
MESSAGE_TIMEOUT = 10000

# Define the JSON message to send to IoT Hub.
TEMPERATURE = 20.0
HUMIDITY = 60
MSG_TXT = "{\"temperature\": %.2f,\"humidity\": %.2f}"

def send_confirmation_callback(message, result, user_context):
    print ( "IoT Hub responded to message with status: %s" % (result) )

def iothub_client_init():
    # Create an IoT Hub client
   client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
   return client

def iothub_client_telemetry_sample_run():

    try:
        client = iothub_client_init()
        print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )

   #******************LOOP*******************************    
   while True:
            # Build the message with simulated telemetry values.
            temperature = TEMPERATURE + (random.random() * 15)
            humidity = HUMIDITY + (random.random() * 20)
            msg_txt_formatted = MSG_TXT % (temperature, humidity)
            message = IoTHubMessage(msg_txt_formatted)

            # Send the message.
            print( "Sending message: %s" % message.get_string() )
            client.send_event_async(message, send_confirmation_callback, None)
            time.sleep(60)

    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "IoTHubClient sample stopped" )

if __name__ == '__main__':
    print ( "IoT Hub Quickstart #1 - Simulated device" )
    print ( "Press Ctrl-C to exit" )
    iothub_client_telemetry_sample_run()

Я хотел бы использовать одну и ту же структуру функций, включая два цикла, которые обрабатывают температуру и влажность, один каждые 60 с и один каждые 600 с.

while True:
    # Build the message with simulated telemetry values.
    temperature = TEMPERATURE + (random.random() * 15)
    msg_txt_formatted1 = MSG_TXT1 % (temperature)
    message1 = IoTHubMessage(msg_txt_formatted1)
    # Send the message.
    print( "Sending message: %s" % message1.get_string() )
    client.send_event_async(message1, send_confirmation_callback, None)
    time.sleep(60)

while True:
    # Build the message with simulated telemetry values.
    humidity = HUMIDITY + (random.random() * 20)
    msg_txt_formatted2 = MSG_TXT2 % (humidity)
    message2 = IoTHubMessage(msg_txt_formatted2)
    # Send the message.
    print( "Sending message: %s" % message2.get_string() )
    client.send_event_async(message2, send_confirmation_callback, None)
    time.sleep(600)

Как я могу это сделать?Как вызвать эти циклы с многопоточностью или другим методом?

Ответы [ 2 ]

0 голосов
/ 14 июня 2019

Вот два конкурирующих подхода к рассмотрению

  1. Не связывайтесь с потоками вообще.Просто сделайте одну петлю, которая спит каждые 60 секунд, как сейчас.Отслеживайте время последней отправки данных о влажности.Если прошло 600 секунд, отправьте его.В противном случае пропустите его и идите спать на 60 секунд.Примерно так:

    from datetime import datetime, timedelta
    
    def iothub_client_telemetry_sample_run():
        last_humidity_run = None
        humidity_period = timedelta(seconds=600)
        client = iothub_client_init()
        while True:
            now = datetime.now()
            send_temperature_data(client)
    
            if not last_humidity_run or now - last_humidity_run >= humidity_period:
                send_humidity_data(client)
                last_humidity_run = now
    
            time.sleep(60)
    
  2. Переименовать iothub_client_telemetry_sample_run в temperature_thread_func или что-то в этом роде.Создайте отдельную функцию, которая выглядит как влажность.Создайте два потока из основной функции вашей программы.Установите их в режим демона, чтобы они отключались при выходе пользователя

    from threading import Thread
    
    def temperature_thread_func():
        client = iothub_client_init()
        while True:
            send_temperature_data(client)
            time.sleep(60)
    
    def humidity_thread_func():
        client = iothub_client_init()
        while True:
            send_humidity_data(client)
            time.sleep(600)
    
    if __name__ == '__main__':
        temp_thread = Thread(target=temperature_thread_func)
        temp_thread.daemon = True
    
        humidity_thread = Thread(target=humidity_thread_func)
        humidity_thread.daemon = True
    
        input('Polling for data. Press a key to exit')
    

Примечания:

  • Если вы решили использовать потокирассмотрите возможность использования события , чтобы завершить их чисто.
  • time.sleep не является точным способом сохранить время.Вам может понадобиться другой механизм синхронизации, если образцы должны быть взяты в точные моменты.
0 голосов
/ 14 июня 2019

Может быть проще сделать что-то вроде

while True:
    loop_b()
    for _ in range(10):
        loop_a()
        time.sleep(60)

или даже

while True:
    time.sleep(1)
    now = time.time()
    if now % 60 == 0:
        loop_a()
    if now % 600 == 0:
        loop_b()

Но если вы действительно хотите использовать потоки, то:

import threading

class LoopAThread(threading.Thread):
    def run(self):
        loop_a()
class LoopBThread(threading.Thread):
    def run(self):
        loop_b()
...

thread_a = LoopAThread()
thread_b = LoopBThread()
thread_a.start()
thread_b.start()
thread_a.join()
thread_b.join()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...