Задача сельдерея, вызывающая себя после того, как задача успешно выполняется без сельдерея - PullRequest
1 голос
/ 18 апреля 2019

Я хочу вызывать свою задачу сельдерея иногда, как каждые 30 минут после выполнения текущей задачи, но иногда задача занимает больше времени, чем ожидалось, из-за задачи загрузки файлов с удаленного сервера.Так что я не хочу использовать celeryBeat.Кроме того, используя себя.Повторите это только для, когда ошибка произошла, я полагаю.Вот моя задача:

@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(self):
    my_file = session.get('example.com/hello.mp4')
    if my_file.status_code == requests.codes["OK"]:
        open("hello.mp4", "wb").write(my_file.content)
    else:
        self.retry()

Обновление:

Ну, я изменил свою структуру следующим образом:

@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
    my_file = session.get(url, name)
    if my_file.status_code == requests.codes["OK"]:
        open(name, "wb").write(my_file.content)
    else:
        self.retry()

@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
    my_list = [...]  # bunch of urls with names
    jobs = []
    for name, url in my_list:
        jobs.append(download_big.si(url, name))
    group(jobs)()

Так что в этом случае мне нужно вызвать метод download_allвместо download_big, таким образом, я могу загружать файл параллельно, и когда все групповые задачи выполнены, его нужно снова вызывать через 30 минут.

1 Ответ

0 голосов
/ 18 апреля 2019

Вы можете попробовать использовать аккорд , который запустит группу задач и, когда они завершатся, запустит обратный вызов, который можно использовать для перепланирования.

, например

from celery import chord

@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
    my_file = session.get(url, name)
    if my_file.status_code == requests.codes["OK"]:
        open(name, "wb").write(my_file.content)
    else:
        self.retry()

@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
    my_list = [...]  # bunch of urls with names
    jobs = []
    for name, url in my_list:
        jobs.append(download_big.si(url, name))

    # Run the group and reschedule once all tasks complete
    chord(jobs)(download_all.apply_async(countdown=1800))
...