В настоящее время я работаю над клиентом IoT Mockup Client, который считывает в пару реальных CSV-данных временных рядов и «воспроизводит» их на основе их метки времени и системных часов (модуль threading
).
В общем, у меня есть Pandas Dataframe, который содержит все старые данные, и функция-обработчик таймера, которая извлекает из него соответствующие строки, чтобы передать их любому приемнику данных.
Это прекрасно работает, если мой таймер- тик-обработчик использует requests.post(..
, а затем просто публикует текстовое тело, полученное из df[...<current-timeslice-filer>...].to_csv()
.
Теперь я хочу передать эти данные на серверный API, поэтому мы решили передавать данные через Websockets, а не через HTTP- ОСТАТОК. Там все становится сложнее. Модуль websockets
сильно зависит от asyncio
, для которого требуется собственное событие l oop. Поскольку мой таймер уже является своего рода событием l oop (на основе threading.timer
), и я должен признать, что я не до конца понимаю концепции, касающиеся asyncio, я думаю, что это не совсем соответствует друг другу.
По крайней мере, я не знаю, как интегрировать метод websocket.send () в мой метод-обработчик, чтобы он выполнялся внутри события asyncio l oop.
DataFrame.to_csv (... можно получить обработчик файла file_or_buf
, и я был бы признателен за использование веб-сокета, аналогичного обработчику файла, и предоставление его здесь для передачи sh моих данных через.
- Есть ли другая реализация websocket в python, которая использует эту парадигму?
- Может ли модуль
websockets
использоваться для достижения этой цели? Я просто ошибаюсь? - Должен ли я реализовать свой временной интервал обработчик отправки данных на основе также с asyncio, так что оба работает внутри события l oop?
РЕДАКТИРОВАТЬ, что у меня есть до сих пор ...
Это мой класс таймера, который вызывает метод do()
каждые interval
сек nds
from threading import Thread,Event
class TimeTicker(Thread):
def __init__(self, interval=1):
Thread.__init__(self)
self.interval = interval
self.stopped = Event()
def run(self):
while not self.stopped.wait(self.interval):
self.do()
def do(self):
print('tick')
def get_stopflag(self):
return self.stopped
Теперь фрагмент базы c для использования websockets
и asyncio
- это ...
import asyncio
import websockets
async def hello():
uri = "ws://echo.websocket.org"
async with websockets.connect(uri) as websocket:
await websocket.send(thread.stream())
r = await websocket.recv()
print(r)
asyncio.get_event_loop().run_until_complete(hello())
Я уже пытался создать свой do()
метод async
, но я не могу инициализировать свой класс TimeTicker
внутри события asyncio l oop, так что вызовы методов 'ожидаются'
Чтобы не усложнять ситуацию, я бы хотел инициализировать соединение websocket вне объекта TimeTicker (он должен предоставлять только данные временных рядов каждую секунду и передавать их в websocket.send()
. Тем не менее, я не уверен, где этот проход данных должен произойти тогда. Также может быть лучшее решение моего класса TimeTicker для yield
данных каждую секунду вместо простого вызова метода. В любом случае, я бы хотел получить совет по этому вопросу.
Подсказка: TimeTicker - это только суперкласс в моем классе источника данных, который на самом деле содержит pandas фрейм данных с прибл. 200.000 строк данных временных рядов считываются из CSV в качестве резервуара для отправки.
Решение: на основе ответа @ wowkin2 мой класс TimeTicker теперь реализован только с помощью asyncio ...
import asyncio
import websockets
class TimeTicker:
is_stopped = False
def __new__(cls, _loop, _uri, interval=1):
instance = super().__new__(cls)
instance.interval = interval
instance.uri = _uri
instance.task = _loop.create_task(instance.run())
return instance.task
async def run(self):
async with websockets.connect(self.uri) as self.ws:
while True:
await self.do()
await asyncio.sleep(self.interval)
async def do(self):
message = 'ping'
await self.ws.send(message)
r = await self.ws.recv()
print(r)
def stop(self):
self.task.cancel()
self.is_stopped = True
uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)