Вам нужно будет создать блокировку для предотвращения одновременного выполнения двух задач, в документации по сельдерею есть страница http://ask.github.io/celery/cookbook/tasks.html с примером того, как это сделать.Будьте осторожны, ваша реализация не застревает в какой-то мертвой блокировке, и вы устанавливаете тайм-аут на свои блокировки, чтобы в случае сбоя работника он не удерживался на блокировке бесконечно.
# Example from the link above
from celery.task import Task
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from djangofeeds.models import Feed
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
class FeedImporter(Task):
name = "feed.import"
def run(self, feed_url, **kwargs):
logger = self.get_logger(**kwargs)
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_digest = md5(feed_url).hexdigest()
lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
logger.debug("Importing feed: %s" % feed_url)
if acquire_lock():
try:
feed = Feed.objects.import_feed(feed_url)
finally:
release_lock()
return feed.url
logger.debug(
"Feed %s is already being imported by another worker" % (
feed_url))
return
Для этого шаблона требуется сервер кэша, который используется для получения блокировки.Когда задача запускается, она получает блокировку на сервере кэша на основе ключа, такого как «my_task», затем, когда ваша задача завершается, она снимает эту блокировку.Любые другие задачи, которые запускаются, возможно, должны иметь цикл while
, который ожидает, пока он не сможет получить блокировку.Блокировки Redis являются атомарными, что означает, что операции, которые получают блокировку, не происходят одновременно, и только одна задача сможет получить блокировку.