Воспроизведение данных на основе таймера через python веб-сокет - PullRequest
1 голос
/ 30 января 2020

В настоящее время я работаю над клиентом IoT Mockup Client, который считывает в пару реальных CSV-данных временных рядов и «воспроизводит» их на основе их метки времени и системных часов (модуль threading).

В общем, у меня есть Pandas Dataframe, который содержит все старые данные, и функция-обработчик таймера, которая извлекает из него соответствующие строки, чтобы передать их любому приемнику данных.

Это прекрасно работает, если мой таймер- тик-обработчик использует requests.post(.., а затем просто публикует текстовое тело, полученное из df[...<current-timeslice-filer>...].to_csv().

Теперь я хочу передать эти данные на серверный API, поэтому мы решили передавать данные через Websockets, а не через HTTP- ОСТАТОК. Там все становится сложнее. Модуль websockets сильно зависит от asyncio, для которого требуется собственное событие l oop. Поскольку мой таймер уже является своего рода событием l oop (на основе threading.timer), и я должен признать, что я не до конца понимаю концепции, касающиеся asyncio, я думаю, что это не совсем соответствует друг другу.

По крайней мере, я не знаю, как интегрировать метод websocket.send () в мой метод-обработчик, чтобы он выполнялся внутри события asyncio l oop.

DataFrame.to_csv (... можно получить обработчик файла file_or_buf, и я был бы признателен за использование веб-сокета, аналогичного обработчику файла, и предоставление его здесь для передачи sh моих данных через.

  • Есть ли другая реализация websocket в python, которая использует эту парадигму?
  • Может ли модуль websockets использоваться для достижения этой цели? Я просто ошибаюсь?
  • Должен ли я реализовать свой временной интервал обработчик отправки данных на основе также с asyncio, так что оба работает внутри события l oop?

РЕДАКТИРОВАТЬ, что у меня есть до сих пор ...

Это мой класс таймера, который вызывает метод do() каждые interval сек nds

from threading import Thread,Event

class TimeTicker(Thread):
    def __init__(self, interval=1):
        Thread.__init__(self)
        self.interval = interval
        self.stopped = Event()

    def run(self):
        while not self.stopped.wait(self.interval):
            self.do()

    def do(self):
        print('tick')

    def get_stopflag(self):
        return self.stopped

Теперь фрагмент базы c для использования websockets и asyncio - это ...

import asyncio
import websockets

async def hello():
    uri = "ws://echo.websocket.org"
    async with websockets.connect(uri) as websocket:
        await websocket.send(thread.stream())
        r = await websocket.recv()
        print(r)

asyncio.get_event_loop().run_until_complete(hello())

Я уже пытался создать свой do() метод async, но я не могу инициализировать свой класс TimeTicker внутри события asyncio l oop, так что вызовы методов 'ожидаются'

Чтобы не усложнять ситуацию, я бы хотел инициализировать соединение websocket вне объекта TimeTicker (он должен предоставлять только данные временных рядов каждую секунду и передавать их в websocket.send(). Тем не менее, я не уверен, где этот проход данных должен произойти тогда. Также может быть лучшее решение моего класса TimeTicker для yield данных каждую секунду вместо простого вызова метода. В любом случае, я бы хотел получить совет по этому вопросу.

Подсказка: TimeTicker - это только суперкласс в моем классе источника данных, который на самом деле содержит pandas фрейм данных с прибл. 200.000 строк данных временных рядов считываются из CSV в качестве резервуара для отправки.

Решение: на основе ответа @ wowkin2 мой класс TimeTicker теперь реализован только с помощью asyncio ...

import asyncio
import websockets


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _uri, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.uri = _uri
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        async with websockets.connect(self.uri) as self.ws:
            while True:
                await self.do()
                await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True

uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)

1 Ответ

1 голос
/ 07 февраля 2020

Вам не нужны многопоточные модули, если вы используете asyncio.

Вот пример того, как периодически что-то делать, используя asyncio. Единственное - у вас должна быть переменная с всегда открытым соединением без контекстного менеджера.

import asyncio


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _ws, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.ws = _ws
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        while True:
            await self.do()
            await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True


uri = "ws://echo.websocket.org"
ws = websockets.connect(uri)

loop = asyncio.get_event_loop()
task = TimeTicker(loop, ws, interval=5)
loop.run_until_complete(task)

...