Python очередь asyncio использовать одновременно получение / размещение - PullRequest
2 голосов
/ 16 июня 2020

Я хотел бы запустить сценарий Python, который может имитировать данные временных рядов реального мира и обрабатывать данные в реальном времени. То есть из заданного набора данных (который составляет ~ 8 часов измерения) я хочу получать данные длиной в одну секунду каждую секунду и обрабатывать, как только каждая секунда данных будет прочитана. В реальном мире данные будут собираться каждую секунду с помощью некоторых детекторов. Для этой задачи я решил использовать модуль Python asyncio. Вот в основном то, что я придумал.

import scipy
import numpy as np
import asyncio
import time
import queue

q = queue.Queue()

async def get_data (data):

    while True:
        await asyncio.sleep(1)
        q.put(data[idx,:])
        idx += 1
        #Each row of data is read every second. 

async def run_algorithm ():

    while True:

        if q.empty() == True:
            await asyncio.sleep(1)

        data_read = q.get(block = False)

        #I do something here with the read data


async def main (data):

    feed_data = asyncio.create_task(get_data(data))
    process_data = asyncio.create_task(run_algorithm ())
    await asyncio.gather(feed_data, process_data)

Хотя кажется, что все работает нормально, проблема в том, что # Я что-то делаю здесь с частью чтения данных. Вот где я использую свой алгоритм для обработки данных в реальном времени. Когда я использую быстрый алгоритм, он работает очень хорошо. Он считывает данные каждую секунду, как и должно, и функция run_algorithm ждет секунду, если в очереди нет данных.

Однако, когда у меня медленный алгоритм, он не считывает данные каждую секунду . Если для выполнения алгоритма требуется 0,5 секунды, то следующие данные будут считаны через 1,5 секунды вместо 1 секунды.

Это похоже на то, что функция get_data замедляется по мере замедления части run_algorithm. Есть ли способ заставить get_data читать данные каждую секунду независимо от того, сколько времени занимает часть run_algorithm?

Ответы [ 2 ]

3 голосов
/ 16 июня 2020

Как указано в комментариях, вам необходимо использовать очередь asyncio, и в этом случае вам не нужны спящие в run_algorithm:

q = asyncio.Queue()

async def get_data (data):
    while True:
        await asyncio.sleep(1)
        await q.put(data[idx,:])
        idx += 1
        #Each row of data is read every second. 

async def run_algorithm ():
    while True:
        data_read = await q.get()
        #I do something here with the read data

Однако, когда у меня есть медленный алгоритм, он не читает данные каждую секунду

Похоже, ваш алгоритм содержит код, связанный с процессором или иным образом блокирующий код. Поскольку asyncio является однопоточным, это вызывает остановку события l oop. Чтобы исправить это, вы должны запустить код блокировки в отдельном потоке. Asyncio содержит утилиту для этой цели run_in_executor. Например, эмулируя вызов блокировки / привязки к ЦП с помощью time.sleep(1) (намеренно не с использованием asyncio.sleep здесь для имитации блокировки), вот как вы бы вызывали его из asyncio:

        #time.sleep(1)   # this is forbidden
        # instead, invoke blocking code like this:
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, time.sleep, 1)
1 голос
/ 16 июня 2020

может быть что-то вроде:

from time import monotonic

async def get_data (data):
    start = monotonic() 
    for i in range(data.shape[0]):
        delay = start + i - monotonic()
        if delay > 0:
            await asyncio.sleep(delay)
        q.put(data[i,:])

т.е. отслеживать, когда вы начали, затем просто спите на оставшуюся задержку. обратите внимание, что это приведет к добавлению большого количества элементов, если потребитель будет слишком медленным для одного из них

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...