Celery period_task работает несколько раз параллельно - PullRequest
3 голосов
/ 11 октября 2011

У меня есть очень простой периодический код, использующий потоки Celery;он просто печатает «Pre» и «Post» и спит между ними.Он адаптирован из этого вопроса StackOverflow и этого связанного веб-сайта

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

Этот код никогда печатает 'already in use...';то же самое происходит, когда я использую декоратор @single_instance_task.

Знаете, что не так?

Редактировать: я упростил вопрос, чтобы он не писал впамять (используя глобальный или кеш django);Я до сих пор никогда не вижу 'already in use...'


Редактировать: Когда я добавляю следующий код в мой файл settings.py Django (изменяя код с https://docs.djangoproject.com/en/dev/topics/cache/ всеработает, как и ожидалось, , но только когда я использую порт 11211 (как ни странно, мой сервер находится на порте 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

1 Ответ

3 голосов
/ 11 октября 2011

Как вы работаете с сельдереем? Я не знаком с резьбовым вариантом.

Если он работает с несколькими процессами, то нет никаких «глобальных» переменных, разделяющих память между рабочими.

Если вы хотите, чтобы счетчик распределялся между всеми работниками, я бы посоветовал вам использовать cache.incr.

например:.

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Обновление

Что произойдет, если вы заставите свои задачи накладываться на сон, например ::

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

Если вы не получаете «уже используется ...» от запуска этого более часто, чем 20 секунд, то вы знаете, что кеш работает не так, как ожидалось.


Другое обновление

Вы установили кеш-бэкэнд в настройках django? Например. Memcached

Если нет, возможно, вы используете Dummy Cache , , который фактически не выполняет никакого кеширования, просто реализует интерфейс ..., который звучит как убедительная причина вашего проблема.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...