Многопоточность с SSE на python - PullRequest
0 голосов
/ 16 июня 2020

Я использую SSE для получения уведомления pu sh от моего сервера leshan lwm2m.

Парадигма leshan заключается в том, что он разделяет поток управления и поток данных.

Поток управления просто HTTP-запрос, в котором вы просите наблюдать за устройством lwm2m.

Плоскость данных - это сеанс sse, используемый для получения уведомлений от сервера о моих iot-устройствах.

Плоскость управления - единственная подключение (по одному на каждый ресурс). Плоскость данных - это один поток со всеми ресурсами внутри.

Итак, я решил, что при первом запросе наблюдения я хочу создать поток, который обрабатывает плоскость данных.

Этот поток должен быть активен до тех пор, пока Меня интересует уведомление об обновлении.

Когда меня больше не интересует поток, он должен присоединиться.

Это код, который я использую для этого:

def observe_resource_sse(address, endpoint , ipsomodel , number_istance , number_resource):
    url_request=address+"/"+endpoint+"/"+ipsomodel+"/"+number_istance+"/"+str(number_resource)+"/observe"
    print(url_request)
    requests.post(url_request,"content=TLV", stream=True,   allow_redirects=True, headers={'Accept': 'text/event-stream'}, timeout=10000)
    path = address+"/event?ep="+endpoint
    path1 = path.replace("/api/clients" , "")
    print(path1)
    global number_observe
    if number_observe == 0:
        number_observe = number_observe + 1
        print("launching thread")
        thread = Thread(target=handler_subscriptions , args= {address, endpoint , path1})
        return thread
    else: return 0

def handler_subscriptions( address , endpoint , path1) :
    print("PATH1: "+ path1)
    client = SSEClient(path1)
    for event in client:
        if "ep" in event.data:
            print(endpoint)
            print(event.data)
            var = json.loads(event.data)
            print(var)
            if var["ep"]==endpoint:
                print(var["val"]["id"],var["val"]["value"])

Главное это:

thread = collector.subscribe_to_events(0 , None , "Sensor_Value" , None)
    thread.start()
    i=0
    while i<10:
        i=i+1
        print(i)
    collector.end_subscription(0 , None , "Sensor_Value", None)
    thread.join()

Я получаю вывод, что выполняется while в основном, а затем поток выполняется, но подписка на события завершена, поэтому он остается заблокированным в для ожидания событий, которые не прибудут и поэтому не присоединятся.

Какое решение для этого?

...