Как я могу получить доступ к возвращаемому значению из скрученных отложенных обратных вызовов с помощью AsyncioSelectorReactor? - PullRequest
0 голосов
/ 17 июня 2020

Используя Python 3.7.7, Twisted 20.3.0 (и Scrapy 2.1.0), когда я пытаюсь ...

doc_link = await self.upload_reseller_document(doc_request, self.create_id(contract))

Я получаю deferred вместо строки. Также мои обратные вызовы не ожидаются.

Ожидается: https://s3.amazonaws.com/some-bucket/some_file.csv или None

Получено: <Deferred at 0x11ae61dd0 current result: None>

    async def conditional_upload(request):
        docs_bucket = 'some-bucket'
        key = f'some-prefix/some_file.csv'
        url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'
        async def cb(obj):
            print('found key, returning url')
            return defer.success(url)

        async def upload_doc():
            print('called upload_doc')
            response = await self.crawler.engine.download(request, self)
            if response.status != 200:
                # Error happened, return item.
                print('could not download reseller csv')
                return defer.error(None)
            print('uploading to', docs_bucket, key)
            return threads.deferToThread(
                self.s3client.put_object,
                Bucket=docs_bucket,
                Key=key,
                Body=response.body)

        async def eb(failure):
            print('did not find key')
            if failure.type != ClientError:
                raise failure.value
            return upload_doc()

        return ensureDeferred(threads.deferToThread(
                self.s3client.head_object,
                Bucket=docs_bucket,
                Key=key).addCallbacks(cb, eb))

1 Ответ

0 голосов
/ 18 июня 2020

Internal Twisted имеет дело только с Deferreds и функциями, которые его возвращают, вы не можете передавать async функции как обратные вызовы Deferreds (при вызове функции asyn c возвращают объект coroutine ), если если вы это сделаете, обратный вызов не будет иметь никакого эффекта, и при остановке реактора вы получите предупреждение «coroutine x никогда не ожидался».
При использовании функций asyn c вы должны просто await Deferreds fini sh и обработайте их результат вместо добавления обратных вызовов и их возврата. Цель функций asyn c - избежать ада обратного вызова .

defer.ensureDeferred используется для обертывания сопрограмм в deferred и позволяет Twisted планировать их для запуска вы используете его, когда вам нужно вызвать функции asyn c внутри функций, которые не являются asyn c.

Используйте try / catch для обработки исключений (это эквивалент errback, но исключение не заключено в скрученные Failure):

async def conditional_upload(request):
    docs_bucket = 'some-bucket'
    key = f'some-prefix/some_file.csv'
    url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'

    async def upload_doc():
        print('called upload_doc')
        response = await self.crawler.engine.download(request, self)
        if response.status != 200:
            # Error happened, return item.
            print('could not download reseller csv')
            raise Exception('could not download reseller csv')
        print('uploading to', docs_bucket, key)
        return await threads.deferToThread(
            self.s3client.put_object, Bucket=docs_bucket, Key=key, Body=body
        )

    # propably here you want to check if something already exists
    try:
        await threads.deferToThread(self.s3client.head_object, Bucket=docs_bucket, Key=key)
        print('found key, returning url')
        return url
    except ClientError:
        print('did not find key, going to upload_doc ...')

    # if does not exists, then create it
    retry_attempts = 10 # avoid infinite loop
    for _ in range(retry_attempts):
        try:
            await upload_doc()
            print('Uploaded the key, returning url')
            return url
        except ClientError:
            print('Failed to upload the key, retrying...')

    print('Failed to upload the key, max attemps tried.')
...