Как сделать задачу celery_app в приложении пирамиды осведомленной о cornice_services и маршрутах, чтобы получить request.route_url? - PullRequest
0 голосов
/ 09 января 2020

У меня есть простая задача для отправки электронных писем с URL-адресом внутри, чтобы присоединиться к моей заявке.

@celery_app.task(base=TransactionalTask)
def send_invitation(invitation_id):
    request = get_current_request()
    mail_service = MailService(request)
    invitation = request.dbsession.query(Invitation) \
        .filter(Invitation.id == invitation_id) \
        .one()

    _app_url = request.registry.settings.get('app_url', request.host_url)
    link = request.route_url('invitation_token', token=invitation.token, _app_url=_app_url)
    message = f'{invitation.message}. {link}'
    mail_service.send_message(invitation.email, message)
    invitation.status = InvitationStatusEnum.Sent
    invitation.scheduled_for = None

Мой TransactionalTask ​​выглядит так:

class TransactionalTask(celery_app.Task):
    """ TransactionalTask is a base task for jobs that needs to be zope transaction aware
        It is possible to pass request as a first argument to a task function.
        Request object is returned from scripting.prepare() method as one of dictionary keys. 
    """
    abstract = True

    def __call__(self, *args, **kwargs):
        registry = celery_app.conf['PYRAMID_REGISTRY']
        environment = scripting.prepare(registry=registry)
        try:
            super().__call__(*args, **kwargs)
        finally:
            environment['closer']()

    def after_return(self, *args, **kwargs):
        get_transaction().commit()

    def on_failure(self, *args, **kwargs):
        get_transaction().abort()

И когда я пытаюсь сделать это в асинхронном режиме c, я получаю: KeyError: 'No such route named invitation_token'. Поэтому, когда я проверяю маршруты, это пустой список и request.cornice_services также являются пустым списком.

...