Custom json сериализация в сельдерее - PullRequest
0 голосов
/ 11 апреля 2020

Я пытаюсь использовать сельдерей с пользовательскими объектами, для которых я реализовал собственный сериализатор, но работники сельдерея пытаются использовать травление.

celeryconfig.py

broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
imports = ('tasks',)
accept_content = ['application/x-json']
task_serializer = 'custom_json'
result_serializer = 'custom_json'

app.py

from celery import Celery
from . import serializers
from . import celeryconfig

from kombu import serialization
serialization.register(
    'custom_json',
    serializers.dumps,
    serializers.loads,
    content_type='application/x-json',
    content_encoding='utf-8',
)

app = Celery()
app.config_from_object(celeryconfig)

if __name__ == '__main__':
    app.start()

main.py

from .tasks import my_task
my_obj = CustomClass()
my_task.delay(my_obj)

Этот код работает нормально, если мой класс был определен в python:

class CustomClass:
    def __init__(self):
        ...

Но мой CustomClass на самом деле происходит от привязки Boost. Python, которую я импортирую из файла .so, а затем получаю следующую ошибку из работник:

[2020-04-11 16:25:08,102: INFO/MainProcess] Received task: my_task[f73a3119-65d7-4a04-9e0d-2bc25ad19dde]  
...
RuntimeError: Pickling of "CustomClass" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)

Я понимаю, что сообщение об ошибке предлагает покопаться в спецификациях их рассола c. Но весь смысл использования пользовательских сериализаторов json заключается не в том, чтобы go идти по этому пути.

Итак, мой вопрос: почему сельдерей даже пытается использовать травление?

Редактировать: В качестве фиктивного примера следующий класс (без наддува), который не может быть извлечен, может привести к следующей ошибке:

 Unrecoverable error: TypeError("can't pickle generator objects",)

class NonPicklableClass:

    def __init__(self, arg):
        self.gen = (i for i in arg)


class CustomEncoder(json.JSONEncoder):

    def default(self, o):
        if isinstance(o, NonPicklableClass):
            return {
                '__type__': 'custom',
                'raw': list(o.gen),
            }
        return o

def hook(o):
    dtype = o.get('__type__')
    if dtype == 'custom':
        return NonPicklableClass(o['raw'])

def dumps(o):
    return json.dumps(o, cls=CustomEncoder)

def loads(s):
    return json.loads(s, object_hook=hook)

Я явно что-то недопонимаю

1 Ответ

0 голосов
/ 13 апреля 2020

Думаю, я понял, что задания отправляются рабочим с помощью пользовательских сериализаторов.

Однако внутри каждого работника данные передаются через каждый процесс с использованием обычной python сортировки.

...