Это в контексте обновления токена доступа для пользователя.Скажем, у нас есть функция refresh_user_token
, которая принимает CustomUser
объект как user
.
def refresh_user_token(user):
...
...
return result
Я бы хотела, чтобы каждое выполнение этой функции для определенного CustomUser
планировало повторениечерез 9 дней.
def refresh_user_token(user):
...
...
next_refresh = datetime.now() + timedelta(days=9)
schedule_refresh(user, scheduled_time=next_refresh)
return result
Большинство случаев использования, которые я вижу для Celery, касаются выполнения пакетных операций, но для этого использования мне нужно иметь возможность выполнять функцию с аргументами, которые, кажется, не соответствуютвозможно с Celery.
Кто-то порекомендовал настроить задание cron для проверки любых токенов, которые нужно обновлять очень быстро, за x секунд.
Итак, на объекте CustomUser
мы имеемDateTimeField
называется last_token_refresh
.
@Celery.task
def refresh_auth_tokens():
users = CustomUser.objects.all()
for user in users:
last_refresh_delta = datetime.now(timezone.utc) - user.last_token_refresh
if last_refresh_delta.days >= 9:
refresh_user_token(user)
return True
else:
return False
Это может сработать, но я чувствую себя очень обременительно, когда посредник сообщений может использоваться только для планирования необходимых задач.