Обновите все устройства-близнецы в Azure-iot-Hub (Python) - PullRequest
0 голосов
/ 31 октября 2019

Я пытаюсь написать программу на python, которая обновляет все устройства-близнецы данного IoT-концентратора, только имея строку подключения концентратора. Код, приведенный здесь: Начало работы с двойниками устройств (Python) не актуально и дает сбой. У кого-нибудь есть успешный опыт работы с такой работой?

Ответы [ 2 ]

1 голос
/ 01 ноября 2019

В дополнение к комментарию @ silent в концентратор IoT Azure встроено специальное задание для массового импорта и экспорта идентификаторов устройств IoT-концентратора.

Эта функция позволяет обновить устройство-близнец, включив в него также сообщаемые свойства. Массовое задание передается службе REST API Создать задание импорта-экспорта .

Массовое задание описывается в файле большого двоичного объекта для каждого устройства построчно, следующая строка является примером для updateTwin на device1 :

{ "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"} }}

В следующем фрагменте кода показано, как можно вызвать эту службу REST для обновления всех двойников устройств, описанных в inputBlobName :

import requests
import json
import time
import urllib
import hmac
import hashlib
import base64


# Example of the blob content for one device
# { "id":"device1", "importMode":"updateTwin", "status":"enabled", "tags":{},"properties":{"desired":{ "key1":12},"reported":{ "key2":null, "key3":"abcd"  } } }


# IoT Hub and Blob Storage
iothub_connection_string = "<IoTHubConnectionString>"
blobContainerUri = "<blobContainerUriSaS>"
inputBlobName = "<inputBlobName>"

def get_sas_token(resource_uri, sas_name, sas_value):  
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return  "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

# iothub_sas_token from connection string
cs = dict(map(lambda x: x.split('=',1), iothub_connection_string.split(';')))
iothub_namespace = cs["HostName"].split('.')[0]
sas_token = get_sas_token(cs["HostName"], cs["SharedAccessKeyName"], cs["SharedAccessKey"])

# REST API see doc: https://docs.microsoft.com/en-us/rest/api/iothub/digitaltwinmodel/service/createimportexportjob
uri = "https://{}.azure-devices.net/jobs/create?api-version=2018-06-30".format(iothub_namespace)
headers = { 'Authorization':sas_token, 'Content-Type':'application/json;charset=utf-8' }
payload = { "inputBlobContainerUri": blobContainerUri, "outputBlobContainerUri": blobContainerUri, "inputBlobName": inputBlobName, "type":"import" }
res = requests.post(uri, data = json.dumps(payload), headers = headers)

print(res)

# check if the job has been accepted
if res.status_code != 200: 
    quit()

# check the job status progress 
jobId = json.loads(res.content)["jobId"]
uri = "https://{}.azure-devices.net/jobs/{}?api-version=2018-06-30".format(iothub_namespace, jobId);
while(True): 
    time.sleep(2)    
    res = requests.get(uri, data = None, headers = headers)
    if  res.status_code == 200 and json.loads(res.content)["status"] == "running": 
        print(".", end="", flush=True)
    else: 
        break
print("Done")

Для создания контейнера BLOB-объектов, генерирующих его sasадрес URI и т. д. можно использовать Microsoft Azure Storage Explorer .

0 голосов
/ 04 ноября 2019

С помощью @Roman Kiss я использовал запрос API create API - который не требует использования хранилища.

JOB_ID = {INSERT_JOB_ID}
CONNECTION_STRING = {INSERT_CONNECTION_STRING}
IOT_HUB_NAME = {INSERT_IOT_HUB_NAME}

def create_device_twin():
    return {"jobId": JOB_ID, "type": "scheduleUpdateTwin", "updateTwin": {"properties": {
        "desired": {INSERT_YOUR_DESIRED_PROERTIES}
    }, "etag": "*"}
     }

def get_sas_token():
    cs = dict(map(lambda x: x.split('=', 1), CONNECTION_STRING.split(';')))
    resource_uri = cs["HostName"]
    sas_name = cs["SharedAccessKeyName"]
    sas_value = cs["SharedAccessKey"]
    sas = base64.b64decode(sas_value.encode('utf-8'))
    expiry = str(int(time.time() + 10000))
    string_to_sign = (resource_uri + '\n' + expiry).encode('utf-8')
    signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
    signature = urllib.parse.quote(base64.b64encode(signed_hmac_sha256.digest()))
    return "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(resource_uri, signature, expiry, sas_name)

if __name__ == '__main__':
    uri = "https://{}.azure-devices.net/jobs/v2/{}?api-version=2018-06-30".format(IOT_HUB_NAME, JOB_ID)
    sas_token = get_sas_token()
    headers = {'Authorization': sas_token, 'Content-Type': 'application/json;charset=utf-8'}
    res = requests.put(uri, headers=headers, data=json.dumps(create_device_twin()))
...