APScheduler добавляет много рабочих мест одновременно (база данных Jobstore) - PullRequest
0 голосов
/ 05 февраля 2020

Как я могу запланировать много заданий APScheduler (4000+) одновременно? (Я должен запланировать все это после определенных пользовательских событий.)

Итеративный вызов add_job просто занимает слишком много времени для многих заданий. Но когда я пытаюсь использовать AsyncIOScheduler и следующий асинхронный код c, я также не получаю никакого дополнительного увеличения производительности.

ПРИМЕЧАНИЕ: мой планировщик должен подключиться к SQL Магазин заданий через SqlAlchemy

scheduler = AsyncIOScheduler(jobstores={"default": SQLAlchemyJobStore(url="a valid db connection str")})
scheduler.start()

def schedule_jobs_quickly():
    # init lots of (fake) jobs
    jobs = []
    for i in range(3000):
        jobs.append(i)
    send_time = datetime.datetime.now() + datetime.timedelta(days=2)

    # try to schedule jobs concurrently
    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(schedule_all_jobs(jobs, send_time))
    duration = time.time() - start_time
    print(f"Created {len(jobs)} jobs in {duration} seconds")


async def schedule_all_jobs(all_jobs, send_time):
    tasks = []
    for job in all_jobs:
        task = asyncio.ensure_future(schedule_job(job, send_time))
        tasks.append(task)
    await asyncio.gather(*tasks, return_exceptions=True)


async def schedule_job(job, send_time):
    scheduler.add_job(send_email_if_needed, trigger=send_time)

Результат очень медленный. Как ускорить это?

>>> schedule_jobs_quickly()
...
Created 3000 jobs in 401.9982771873474 seconds

Для сравнения, это то, сколько времени потребовалось для BackgroundScheduler() с использованием хранилища заданий памяти по умолчанию:

Created 3000 jobs in 0.9155495166778564 seconds

Итак, это кажется, что соединения с базой данных, которые так дороги. Может быть, есть способ создать несколько заданий, используя одно и то же соединение, вместо повторного подключения для каждого add_job?

1 Ответ

0 голосов
/ 14 февраля 2020

Это не то решение, которое я искал, но я решил отказаться от AsyncIOScheduler и вместо этого запланировать свои многочисленные задачи в отдельном потоке, чтобы остальная часть моей программы могла продолжаться без удержания всеми соединениями с БД. Пример ниже.

from threading import Thread

def schedule_jobs_quickly():
    # init lots of (fake) jobs
    jobs = []
    for i in range(3000):
        jobs.append(i)
    send_time = datetime.datetime.now() + datetime.timedelta(days=2)

    # schedule jobs in new thread
    scheduler_thread = Thread(target=schedule_email_jobs, args=(email_jobs,))
    scheduler_thread.start()


def schedule_email_jobs(jobs):
    for job in jobs:
        scheduler.add_job(send_email, trigger=send_time)

def send_email():
   # sends email 
...