Я нашел решение получить аргументы задачи в том виде, в котором они вызваны (list / dict) вместо строк.
Параметры argsrepr
и kwargsrepr
метода apply_async
позволяют указатьпользовательские представления для аргументов задачи.
Я создал собственный класс Task , переопределяя метод delay
следующим образом:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)".format(len(arg)))
else:
...
for key, value in kwargs.items():
...
# Create and call the task with the reprs we just built
new_task = super().s(*args, **kwargs)
return new_task.apply_async(argsrepr=json.dumps(argsrepr), kwargsrepr=json.dumps(kwargsrepr))
Затем я использую этот класс какбаза для моей задачи:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
Таким образом, представления аргументов задачи сохраняются в виде JSON, который я могу затем загрузить в format_task
Flower и использовать их как объекты, а не строки:
def format_task(task):
task.args = json.loads(task.args)
if not task.args:
task.args = "( )"
else:
task.args = ', '.join(arg for arg in task.args)
# [...]
Я до сих пор не нашел способ скрыть задачи.Я думаю, что мне, возможно, придется настроить CSS из Flower.