Вы можете добавить сигналы использования:
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_celery_beat.models import PeriodicTask, IntervalSchedule
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField()
project_status = models.BooleanField()
def set_periodic_task(self, task_name):
schedule = self.get_or_create_interval()
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
def get_or_create_interval(self):
schedule, created = IntervalSchedule.objects.get_or_create(
every=self.project_scan,
period=IntervalSchedule.HOURS,
)
return schedule
def get_periodic_task(self, task_name):
interval = self.get_or_create_interval()
periodic_task = PeriodicTask.objects.get(
interval=interval,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
return periodic_task
def sync_disable_enable_task(self, task_name):
periodic_task = self.get_periodic_task(task_name)
periodic_task.enabled = self.project_status
periodic_task.save()
@receiver(post_save, sender=Project)
def set_or_sync_periodic_task(sender, instance=None, created=False, **kwargs):
if created:
instance.set_periodic_task(task_name='project_tasks')
else:
instance.sync_disable_enable_task(task_name='project_tasks')
Что у вас есть: при создании нового экземпляра Project
новое Периодическое задание сохраняется с методом set_periodic_task
.Если вы хотите disable
или enable
периодическое задание вашего экземпляра, просто измените статус project_status
и сохраните его.Он включит sync_disable_enable_task
метод для включения или отключения.
, если вы хотите передать аргументы, вы можете сделать:
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task='proj.tasks.import_contacts',
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({
'some_kwarg': '123,
}),
)