Я создаю задачу (подклассом celery.task.Task), которая создает соединение с потоковым API Twitter.Для вызовов Twitter API я использую tweepy.Как я прочитал в документации по сельдерею, «задача не создается для каждого запроса, а регистрируется в реестре задач как глобальный экземпляр».Я ожидал, что всякий раз, когда я вызову apply_async (или задержку) для задачи, я буду получать доступ к задаче, которая была первоначально создана, но этого не происходит.Вместо этого создается новый экземпляр пользовательского класса задач.Мне нужно иметь доступ к исходной пользовательской задаче, так как это единственный способ завершить исходное соединение, созданное вызовом tweepy API.
Вот небольшой фрагмент кода, если это поможет:
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
А для просмотра Django
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
Любая помощь будет оценена.: D
РЕДАКТИРОВАНИЕ:
Для дополнительного контекста вопроса объект CustomStream создает экземпляр httplib.HTTPSConnection при каждом вызове метода filter ().Это соединение должно быть закрыто всякий раз, когда есть другая попытка его создать.Соединение закрывается установкой для customstream.running значения False.