У меня есть следующая задача сельдерея, и я хотел знать, как изменить состояние по умолчанию при повторном выполнении задачи.Я знаю, чтобы изменить состояние, которое вы обычно задаете в следующей строке self.update_state(state='MY CUSTOM STATE')
, однако, когда код повторяется, кажется, что-то другое.В конце я хотел бы изменить состояние повтора с RETRY на WAITING.
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
print('in memcache_lock and timeout_at is {}'.format(timeout_at))
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
print('memcache_lock and status is {}'.format(status))
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
print('dir is {} '.format(dir(self)))
lock_id = self.name
print('lock_id is {}'.format(lock_id))
with memcache_lock(lock_id, self.app.oid) as acquired:
print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later