Сельдерей создает несколько экземпляров задачи - PullRequest
1 голос
/ 25 октября 2011

Я создаю задачу (подклассом celery.task.Task), которая создает соединение с потоковым API Twitter.Для вызовов Twitter API я использую tweepy.Как я прочитал в документации по сельдерею, «задача не создается для каждого запроса, а регистрируется в реестре задач как глобальный экземпляр».Я ожидал, что всякий раз, когда я вызову apply_async (или задержку) для задачи, я буду получать доступ к задаче, которая была первоначально создана, но этого не происходит.Вместо этого создается новый экземпляр пользовательского класса задач.Мне нужно иметь доступ к исходной пользовательской задаче, так как это единственный способ завершить исходное соединение, созданное вызовом tweepy API.

Вот небольшой фрагмент кода, если это поможет:

from celery import registry
from celery.task import Task

class FollowAllTwitterIDs(Task):
    def __init__(self):
        # requirements for creation of the customstream
        # goes here. The CustomStream class is a subclass
        # of tweepy.streaming.Stream class

        self._customstream = CustomStream(*args, **kwargs)

    @property
    def customstream(self):
        if self._customstream:
            # terminate existing connection to Twitter
            self._customstream.running = False
        self._customstream = CustomStream(*args, **kwargs)

    def run(self):
        self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()

        self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]

А для просмотра Django

def connect_to_twitter(request):
    if request.method == 'POST':
        do_stuff_here()
        .
        .
        .

        follow_all_twitterids.apply_async(args=[], kwargs={})

     return

Любая помощь будет оценена.: D

РЕДАКТИРОВАНИЕ:

Для дополнительного контекста вопроса объект CustomStream создает экземпляр httplib.HTTPSConnection при каждом вызове метода filter ().Это соединение должно быть закрыто всякий раз, когда есть другая попытка его создать.Соединение закрывается установкой для customstream.running значения False.

1 Ответ

0 голосов
/ 26 октября 2011

Задание должно быть создано только один раз, если вы думаете, что это не по какой-то причине, Я предлагаю вам добавить

печать ( "Instantiate") отслеживание импорта traceback.print_stack ()

к методу Task.__init__, чтобы вы могли сказать, где это будет происходить.

Я думаю, что ваша задача может быть лучше выражена так:

from celery.task import Task, task

class TwitterTask(Task):
    _stream = None
    abstract = True

    def __call__(self, *args, **kwargs):
        try:
            return super(TwitterTask, self).__call__(stream, *args, **kwargs)
        finally:
            if self._stream:
                self._stream.running = False

    @property
    def stream(self):
        if self._stream is None:
            self._stream = CustomStream()
        return self._stream

@task(base=TwitterTask)
def follow_all_ids():
    ids = get_list_of_ids_to_follow()
    follow_all_ids.stream.filter(follow=ids, async=false)
...