Я использую флягу + MySQL и сельдерей + Redis.
У меня есть модели Backend
и Item
с отношением ForeignKey
.
У меня есть код, подобный приведенному ниже, который я запускаю в задаче:
@task()
def my_task():
while True:
backends = db.session.query(Backend).filter(Item.in_use==False)
for backend in backends:
# do some staff
# select Item again and set it in_use
try:
item = db.session.query(
Item
).with_for_update().filter_by(
backend=backend,
in_use=False
).order_by(func.rand()).first()
if item:
item.in_use = True
backend.usage = Backend.usage+1
db.session.add(item)
db.session.add(backend)
db.session.commit()
except exc.OperationalError as e:
# catching deadlock
pass
db.session.commit()
time.sleep(30)
if item:
item.in_use = False
db.session.add(item)
db.session.commit()
break
Если я запускаю 5-10 параллельных задач, он работает нормально. Но когда я запускаю 100 задач в конце, у меня есть 10-15 задач, которые будут работать вечно. Потому что запрос:
backends = db.session.query(Backend).filter(Item.in_use==False)
Возвращает всегда 0 бесплатно Backends
.
В MYSQL Я вижу, что все предметы были выпущены. Я могу сохранить их, изменить статус, но запросы внутри задачи продолжают возвращаться 0 Items
Я запускаю задачу так:
@task
def run_tasks():
g = group(my_task.s() for _ in range(0, count))
g()
run_tasks.delay()
Если я сделаю:
SHOW PROCESSLIST;
Я вижу некоторые процессы сна.