Расписание периодов c неблокирующих задач в python без работы с GIL - PullRequest
0 голосов
/ 26 апреля 2020

Я использовал Python модуль schedule для планирования перекрывающихся задач, которые не блокируют друг друга; однако рекомендуемый способ достижения неблокирующего поведения - использовать модуль python threading (который, конечно, уязвим для GIL ).

Цитирование из модуля расписания документы :

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(1).seconds.do(run_threaded, job)
schedule.every(2).seconds.do(run_threaded, job)
schedule.every(3).seconds.do(run_threaded, job)
schedule.every(4).seconds.do(run_threaded, job)
schedule.every(5).seconds.do(run_threaded, job)

while 1:
    schedule.run_pending()
    time.sleep(1)

Как нам планировать и выполнять задачи, не беспокоясь о GIL ?

1 Ответ

0 голосов
/ 26 апреля 2020

Как мы планируем и выполняем задачи, не беспокоясь о GIL ?

Одним из относительно простых способов решения этой проблемы является использование многопроцессорного модуля. ... это дает нам cron-подобное поведение, не беспокоясь о проблемах GIL ...

Избегать проблем GIL с многопроцессорной обработкой - ничего инновационного ... Я просто документирую здесь в надежде помочь будущим гуглерам ...

from multiprocessing import Process
from datetime import datetime
import time

from schedule import Scheduler

class MPScheduler(Scheduler):
    def __init__(self, args=None, kwargs=None):
        if args is None:
            args = ()
        if kwargs is None:
            kwargs = {}
        super(MPScheduler, self).__init__(*args, **kwargs)
        # Among other things, this object inherits self.jobs (a list of jobs)
        self.args = args
        self.kwargs = kwargs
        self.processes = list()

    def _mp_run_job(self, job_func):
        """Spawn another process to run the job; multiprocessing avoids GIL issues"""
        job_process = Process(target=job_func, args=self.args,
            kwargs=self.kwargs)
        job_process.daemon = True
        job_process.start()
        self.processes.append(job_process)

    def run_pending(self):
        """Run any jobs which are ready"""
        runnable_jobs = (job_obj for job_obj in self.jobs if job_obj.should_run)
        for job_obj in sorted(runnable_jobs):
            job_obj.last_run = datetime.now()   # Housekeeping
            self._mp_run_job(job_obj.job_func)
            job_obj._schedule_next_run()        # Schedule the next execution datetime

        self._retire_finished_processes()

    def _retire_finished_processes(self):
        """Walk the list of processes and retire finished processes"""
        retirement_list = list()   # List of process objects to remove
        for idx, process in enumerate(self.processes):
            if process.is_alive():
                # wait a short time for process to finish
                process.join(0.01)
            else:
                retirement_list.append(idx)

        ## Retire finished processes
        for process_idx in sorted(retirement_list, reverse=True):
            self.processes.pop(process_idx)

def job(id, hungry=True):
    print("{} running {} and hungry={}".format(datetime.now(), id, hungry))
    time.sleep(10)   # This job runs without blocking execution of other jobs

if __name__=='__main__':
    # Build a schedule of overlapping jobs...
    mp_sched = MPScheduler()
    mp_sched.every(1).seconds.do(job, id=1, hungry=False)
    mp_sched.every(2).seconds.do(job, id=2)
    mp_sched.every(3).seconds.do(job, id=3)
    mp_sched.every(4).seconds.do(job, id=4)
    mp_sched.every(5).seconds.do(job, id=5)

    while True:
        mp_sched.run_pending()
        time.sleep(1)
...