Задача Celery на разных серверах, запускающихся одновременно - PullRequest
0 голосов
/ 11 июля 2019

У меня есть два экземпляра кода Django Celery, запущенных на двух разных серверах для избыточного доступа к общей базе данных на другом сервере. Я заметил, что сельдерей побеждает запуск одной и той же задачи одновременно на двух серверах, когда работа отправляется пользователями. Это создает условия гонки и обновляет базу данных дважды. Как предотвратить это, удерживая задачу на одном сервере, пока на другом сервере запущена другая аналогичная задача?

1 Ответ

0 голосов
/ 12 июля 2019

Вам нужно будет создать блокировку для предотвращения одновременного выполнения двух задач, в документации по сельдерею есть страница http://ask.github.io/celery/cookbook/tasks.html с примером того, как это сделать.Будьте осторожны, ваша реализация не застревает в какой-то мертвой блокировке, и вы устанавливаете тайм-аут на свои блокировки, чтобы в случае сбоя работника он не удерживался на блокировке бесконечно.

# Example from the link above
from celery.task import Task
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from djangofeeds.models import Feed

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes

class FeedImporter(Task):
    name = "feed.import"

    def run(self, feed_url, **kwargs):
        logger = self.get_logger(**kwargs)

        # The cache key consists of the task name and the MD5 digest
        # of the feed URL.
        feed_url_digest = md5(feed_url).hexdigest()
        lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)

        # cache.add fails if if the key already exists
        acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        release_lock = lambda: cache.delete(lock_id)

        logger.debug("Importing feed: %s" % feed_url)
        if acquire_lock():
            try:
                feed = Feed.objects.import_feed(feed_url)
            finally:
                release_lock()
            return feed.url

        logger.debug(
            "Feed %s is already being imported by another worker" % (
                feed_url))
        return

Для этого шаблона требуется сервер кэша, который используется для получения блокировки.Когда задача запускается, она получает блокировку на сервере кэша на основе ключа, такого как «my_task», затем, когда ваша задача завершается, она снимает эту блокировку.Любые другие задачи, которые запускаются, возможно, должны иметь цикл while, который ожидает, пока он не сможет получить блокировку.Блокировки Redis являются атомарными, что означает, что операции, которые получают блокировку, не происходят одновременно, и только одна задача сможет получить блокировку.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...