Пользовательский класс задачи Celery не работает должным образом - PullRequest
0 голосов
/ 21 мая 2018

Я изо всех сил пытаюсь получить работу Сельдерея.Я новичок в Python в целом и, очевидно, в Celery, и я пытаюсь получить базовый пример работы.Я хотел бы запустить фоновую задачу, которая должна сохранить его состояние, пока оно живо.Поэтому я пытаюсь реализовать базовый пример увеличения целочисленной переменной при вызове из клиентского скрипта.Я работаю над Raspberry Pi с малиновым изображением.Вот код моей рабочей задачи:

from celery import Task, registry, Celery
import celery

celery = Celery('tasks', broker='redis://localhost:6379',backend='redis://localhost:6379')

class MyTask(celery.Task):
        a = 0
        def __init__(self):
                self.a = 0

        def increment(self, x):
                self.a += x
                return self.a
        @property
        def a(self):
                return a

@celery.task(Base=MyTask)
def mytask(x):
        mytask.increment(x)
        return mytask.a

И это вызывающий скрипт:

from tasks import mytask

result = mytask.delay(2)
print result.get(timeout=1)

, который выдает следующую ошибку:

[2018-05-21 15:08:23,889: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x7610cdf0> (args:('tasks.mytask', '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', {'origin': 'gen27381@raspberrypi', 'lang': 'py', 'task': 'tasks.mytask', 'group': None, 'root_id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': 'celery', u'exchange': u''}, 'expires': None, u'correlation_id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', 'retries': 0, 'timelimit': [None, None], 'argsrepr': '(2,)', 'eta': None, 'parent_id': None, u'reply_to': 'fdebe38f-7353-3ef2-920b-74f76a294ff7', 'id': '3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2', 'kwargsrepr': '{}'}, '[[2], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-05-21 15:08:23,894: DEBUG/MainProcess] Task accepted: tasks2.mytask[3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2] pid:27366
Traceback (most recent call last):
  File "test2.py", line 4, in <module>
    print result.get(timeout=1)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 194, in get
    on_message=on_message,
  File "/usr/local/lib/python2.7/dist-packages/celery/backends/async.py", line 191, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 299, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 292, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/vine/promises.py", line 217, in throw
    reraise(type(exc), exc, tb)
  File "<string>", line 1, in reraise
celery.backends.base.AttributeError: 'mytask' object has no attribute 'increment'
[2018-05-21 15:08:23,910: ERROR/ForkPoolWorker-2] Task tasks.mytask[3ae83225-faee-4dc0-a6c1-1a7e7e5eaca2] raised unexpected: AttributeError("'mytask' object has no attribute 'increment'",)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/pi/example/tasks.py", line 20, in mytask
    mytask.increment(x)
  File "/usr/local/lib/python2.7/dist-packages/celery/local.py", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
AttributeError: 'mytask' object has no attribute 'increment'

Рабочий добавленс помощью команды:

сельдерей -А работник задач --loglevel = debug

и тестовый скрипт просто с

python test.py

Я также попытался получить доступ к переменной «a» напрямую и в результате получил ту же ошибку.Очевидно, что я делаю что-то очень простое неправильно, но я не могу понять, что.

1 Ответ

0 голосов
/ 21 мая 2018

Хорошо, проблема действительно была очень простой.Синтаксис для определения пользовательского класса задачи - это не ChildClass (Base = ParentClass), а ChildClass (base = ParentClass) ... Я, должно быть, скопировал его из какого-то примера, и интерпретатор никогда не думал жаловаться на это.Есть также несколько других проблем с примером в вопросе. Вот код, который на самом деле делает то, что должен был сделать:

    from celery import Task, registry, Celery
    import celery


    class MyTask(celery.Task):
            a = 0

            def increment(self, x):
                    self.a += x
                    return self.a

    celery = Celery('tasks', broker='redis://localhost:6379',backend='redis://localhost:6379')
    @celery.task(base=MyTask)
    def fookyou(x):
            val = fookyou.increment(x)
            return val

Обратите внимание, что я изменил также имя задачи в случае, если оно смешивалось с пользовательскимНазвание класса, когда единственной разницей были заглавные буквы.По-видимому, слишком сложно просить переводчика определить ошибки такого типа ...

...