Выполнение цикла Asyncio в течение интервала времени - PullRequest
3 голосов
/ 30 октября 2019

Цель : запуск main(), состоящий из набора асинхронных функций от start_time до end_time

import datetime as dt

start_time, end_time= dt.time(9, 29), dt.time(16, 20)

current_time() просто добавляет текущее время к работепространство. Это время используется в нескольких разных точках скрипта, который здесь не показан

async def current_time(): 
    while True:
        globals()['now'] = dt.datetime.now().replace(microsecond=0)
        await asyncio.sleep(.001)

Еще одна функция, которая что-то делает:

async def balance_check():
    while True:    
        #do something here
        await asyncio.sleep(.001)

main() ожидает предыдущих сопрограмм:

async def main():
    while True:
#Issue:This is my issue. I am trying to make these
#coroutines only run between start_time and end_time. 
#Outside this interval, I want the loop to 
#shut down and for the script to stop.
        if start_time < now.time() < end_time : 

            await asyncio.wait([
                    current_time(),
                    balance_check(),
                    ])
        else:
            print('loop stopped since {} is outside {} and {}'.format(now, start_time, end_time))
            loop.stop() 


loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()

Issue : это продолжает работать даже после end_time

Ответы [ 2 ]

2 голосов
/ 31 октября 2019

Проблема заключается в неправильном использовании await asyncio.wait([current_time(), balance_check(),]).

Ожидание asyncio.wait() ожидает указанных ожидаемых значений до complete , то есть либо возвращает значение, либо вызывает исключение. Поскольку ни current_time, ни balance_check никогда не возвращаются из своих бесконечных циклов, выполнение main() никогда не будет передано await asyncio.wait(...), так как это выражение ожидает завершения обоих. В свою очередь, цикл while в main() никогда не достигает своей второй итерации, и loop.stop() не имеет шансов на запуск.

Если код намеревался использовать asyncio.wait(), чтобы дать сопрограммам шансдля запуска, asyncio.wait не является инструментом для этого. Вместо этого можно просто запустить две задачи, вызвав asyncio.create_task(), а затем ничего не делать . Пока цикл событий может работать (то есть он не блокируется вызовом time.sleep() или подобным), asyncio будет автоматически переключаться между сопрограммами, в этом случае current_time и balanced_check с их скоростью ~ 1 миллисекунды. Конечно, вы захотите восстановить контроль к крайнему сроку end_time, поэтому «бездействие» лучше всего выражать одним вызовом asyncio.sleep():

async def main():
    t1 = asyncio.create_task(current_time())
    t2 = asyncio.create_task(balance_check())
    end = dt.datetime.combine(dt.date.today(), end_time)
    now = dt.datetime.now()
    await asyncio.sleep((end - now).total_seconds())
    print('loop stopped since {} is outside {} and {}'.format(now, start_time, end_time))
    t1.cancel()
    t2.cancel()

Обратите внимание, что явный loop.stop()даже не обязательно, потому что run_until_complete() автоматически остановит цикл после завершения данной сопрограммы. Вызов cancel() для задач не имеет практического эффекта, потому что цикл останавливается практически сразу;он включен для выполнения этих задач, чтобы сборщик мусора не предупреждал об уничтожении ожидающих выполнения задач.

Как отмечено в комментариях, этот код не ожидает start_time, нофункциональность легко достигается добавлением еще одного спящего режима перед порождением задач.

1 голос
/ 31 октября 2019

Вы можете использовать синхронизирующие примитивы ', чтобы сделать это, в частности, events.

import datetime as dt
import asyncio

async def function(start_event, end_event):
    # Waits until we get the start event command.
    await start_event.wait()

    # Run our function until we get the end_event being set to true.
    index = 0
    while not end_event.is_set():
        print(f"Function running; index {index}.")
        await asyncio.sleep(2)
        index += 1

async def controller(start_event, end_event):
    # Will usually be this. Commented out for testing purposes. Can be passed by main.
    # Subtraction/addition does not work on dt.datetime(). Inequality does (> or <)
    # now, start, end = dt.datetime.now().time(), dt.time(17, 24), dt.time(18, 0, 30)

    now = dt.datetime.now()
    start, end = now + dt.timedelta(seconds=-10), now + dt.timedelta(seconds=10)

    # Starts our function.
    print(f"Starting at {start}. Ending in {end - now}. ")
    start_event.set()

    # Keeps checking until we pass the scheduled time.
    while start < now < end:
        print(f"Ending in {end - now}.")
        await asyncio.sleep(2)
        now = dt.datetime.now()

    # Ends our function.
    end_event.set()
    print(f"Function ended at {now}.")

async def main():
    # Creates two indepedent events.
    # start_event is flagged when controller is ready for function to be ran (prevents misfires).
    # end_event is flagged when controller see's start < time.now < end (line 30).
    start_event = asyncio.Event()
    end_event = asyncio.Event()

    # Starts both functions at the same time.
    await asyncio.gather(
        controller(start_event, end_event), function(start_event, end_event)
    )

if __name__ == "__main__":
    # Starting our process.
    asyncio.run(main())

Этот метод потребует как function, так и controller, чтобы занять место вasyncio петля. Тем не менее, function будет заблокирован в течение большей части времени, и вместо этого он будет ресурсом цикла переключения контроллера с его циклом while, так что имейте это в виду.

...