Можно ли передавать OrderedDict в качестве аргумента задачи Celery? - PullRequest
0 голосов
/ 06 ноября 2018

У меня есть внутри сериализатора Django REST Framework переопределенный update метод .
В этом update, поскольку пользователь может отправить много детей, у меня есть асинхронное задание Сельдерея process_children, чтобы разобраться с детьми.

class MyModelSerializer(serializers.ModelSerializer):
    ....

    @transaction.atomic
    def update(self, mymodel, validated_data):
        try:
            children_data = validated_data.pop('children')
            transaction.on_commit(lambda: process_children.apply_async(
                countdown=1,
                args=[mymodel.id, children_data]))
        except KeyError:
            pass
        ...

В аргументах есть один аргумент, который является не json объектом, а OrderedDict: children_data.

Задание выглядит так:

@app.task
def process_children(mymodel_id, children_data):
    mymodel = MyModel.objects.get(pk=mymodel_id)
    children = mymodel.children.all()
    for child_data in children_data:
        try:
            child = children.get(start=child_data['start'])
            child = populate_child(child, child_data)
            child.save()
        except Child.DoesNotExist:
            create_child(mymodel, child_data)

Я читал, что мы должны отправлять только json (или рассол, ямл, что угодно ...) аргументов.

  • Но эта настройка, кажется, работает
  • Я даже могу отправить объект datetime (т. Е. Атрибут start, который я использую в задаче для сопоставления сохраненного потомка с новыми значениями, отправленными через API).

Так что здесь происходит?

  • Все в порядке, сельдерей сериализует и десериализует OrderedDict как босс.
  • Или я сумасшедший и должен сериализоваться перед вызовом задачи и десериализоваться внутри задачи?

[ОБНОВЛЕНИЕ, добавление настроек CELERY]

CELERY_BROKER_URL = get_env_variable('REDIS_URL')
CELERY_BROKER_POOL_LIMIT = 0
CELERY_REDIS_MAX_CONNECTIONS = 10
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'

Ответы [ 2 ]

0 голосов
/ 08 ноября 2018

Да, вы делаете это правильно.

Как уже упоминалось в документ .

Данные, передаваемые между клиентами и работниками, должны быть сериализованы, поэтому каждое сообщение в Celery имеет заголовок content_type, который описывает метод сериализации, используемый для его кодирования.

Кроме того, из сельдерея 4.0 по умолчанию сериализатором является JSON (который был pickle ранее). Поэтому всякий раз, когда вы вызываете эту задачу, сельдерей по умолчанию выполняет сериализацию и десериализацию. Если вы хотите использовать любой другой сериализатор, то при вызове задачи вам нужно указать content-type (если вы используете .delay, то по умолчанию сериализатором будет json.

process_children.apply_async((model_id, children_data), serializer='pickle')
0 голосов
/ 07 ноября 2018

Вы используете сериализатор pickle, который будет обрабатывать объекты относительно хорошо, но есть проблемы с ним. Вот сообщение в блоге о концепции сериализации и сельдерея.

...