Я уже давно борюсь с той же проблемой, и мне все труднее добиваться значимого прогресса.Написав этот вопрос, я надеюсь найти потенциальное решение или собрать новые идеи, чтобы продолжить пробовать себя.
На высоком уровне это то, чего я хотел бы достичь:
Я хотел бы смоделировать развертывание IoT небольшого масштаба, состоящее из примерно 50 устройств.У меня есть доступ к показаниям датчиков с помощью API потоковой передачи на основе веб-сокета, с частотой примерно одно чтение (на датчик) в минуту.
В дополнение к обработке этих данных каждый датчик должен взаимодействовать (полнодуплексный) с централизованным узлом-координатором, задачей которого является объединение данных и распространение результатов.
Надеюсь, это описание достаточно хорошее, чтобы дать некоторое представление о том, чего я пытаюсь достичь.
Для простоты тестирования я хотел бы смоделировать каждый компонент как объект спростой интерфейс, инкапсулирующий операции: Send / Receive / Broadcast.
class Sensor(object):
Receive : Receive sensor readings from the streaming API
Send : Send data to the coordinator
Receive : Receive data from the coordinator
class Coordinator(object)
Send : Send data to a specific sensor
Broadcast : Send data to all connected sensors
Receive : Receive data from a connected sensor
До сих пор в большинстве моих работ использовались библиотеки asyncio и websockets .
Последний подход
Недавно я потратил еще немного времени на чтение структуры asyncio и попытался сделать следующее.Идея здесь заключается в том, что, вводя общий EventLop (планировщик) в каждый объект, я могу получить нужную мне абстракцию.
Датчик
import asyncio
from Client import Client
from Readings import Reading
class Sensor(object):
def __init__(self):
self.scheduler = asyncio.get_event_loop()
self.urban_api = None # Websocket streaming API
self.readings = Reading(self.scheduler, self.urban_api)
self.coordinator = Client(self.scheduler, host='localhost', port=8080)
self.scheduler.create_task(self.repetitive_message())
try:
self.scheduler.run_forever()
except KeyboardInterrupt:
pass
async def called_on_new_reading():
# Called on receipt of new reading.
@staticmethod
async def repetitive_message():
while True:
print('Sensor performing work')
await asyncio.sleep(2)
if __name__ == '__main__':
sensor = Sensor()
Чтение - API обработки потоков
from websockets import connect
class Reading(object):
def __init__(self, scheduler, urban_api):
self.scheduler = scheduler
self.urban_api = urban_api
self.scheduler.create_task(self.receive())
async def receive(self):
async with connect(self.urban_api) as websocket:
async for message in websocket:
print(message)
Я не добился никакого прогресса в реализации координатора с момента перехода на websockets , но я надеюсь, что намерениеЧисто.
У меня есть несколько вопросов:
- Это правильная стратегия для достижения симуляции, которую я описал?
- Существуют ли библиотеки, которые могут это упростить?дальше?
- Как я могу получить показания из веб-сокета в моем классе датчиков?
Мне не удалось найти какие-либо примеры ООП-асинхронного программирования - возможно, я что-то упустил?
Я ценю, что этот пост довольно длинный и не содержит частей, но любая обратная связь или указатели будут с благодарностью.