sqlalchemy with_for_update причина, по которой select не работает в сеансе - PullRequest
1 голос
/ 11 апреля 2019

Я использую флягу + MySQL и сельдерей + Redis.

У меня есть модели Backend и Item с отношением ForeignKey.

У меня есть код, подобный приведенному ниже, который я запускаю в задаче:

@task()
def my_task():
  while True:
    backends = db.session.query(Backend).filter(Item.in_use==False)

    for backend in backends:
       # do some staff

       # select Item again and set it in_use
       try:
           item = db.session.query(
               Item
           ).with_for_update().filter_by(
               backend=backend,
               in_use=False
            ).order_by(func.rand()).first()
            if item:
                item.in_use = True
                backend.usage = Backend.usage+1
                db.session.add(item)
                db.session.add(backend)
                db.session.commit()
       except exc.OperationalError as e:
          # catching deadlock
          pass

       db.session.commit()
    time.sleep(30)
    if item:
       item.in_use = False
       db.session.add(item)
       db.session.commit()
       break

Если я запускаю 5-10 параллельных задач, он работает нормально. Но когда я запускаю 100 задач в конце, у меня есть 10-15 задач, которые будут работать вечно. Потому что запрос:

backends = db.session.query(Backend).filter(Item.in_use==False)

Возвращает всегда 0 бесплатно Backends.

В MYSQL Я вижу, что все предметы были выпущены. Я могу сохранить их, изменить статус, но запросы внутри задачи продолжают возвращаться 0 Items

Я запускаю задачу так:

@task
def run_tasks():
    g = group(my_task.s() for _ in range(0, count))
    g()

run_tasks.delay()

Если я сделаю:

SHOW PROCESSLIST;

Я вижу некоторые процессы сна.

enter image description here

...