Полезная нагрузка ответа не завершена с использованием asyncio / aiohttp - PullRequest
0 голосов
/ 28 мая 2019

Я написал сценарий Python 3.7, который асинхронно (asyncio 3.4.3 and aiohttp 3.5.4) создает Salesforce массовое API (v45.0) задание / пакет, используя несколько объектов, запрашиваемых одним оператором SOQL, ожидает завершения пакетов,по завершении загружает (передает) результаты на сервер, выполняет некоторые преобразования данных и, наконец, синхронно выгружает результаты в SQL Server 2016 SP1 (13.0.4560.0).У меня было много успешных пробных запусков с этим и я думал, что он работал отлично, однако, я недавно начал периодически получать следующую ошибку, и я в некотором затруднении, как исправить, так как очень мало отчетов / решений этой проблемыв Интернете:

aiohttp.client_exceptions.ClientPayloadError: Полезная нагрузка ответа не завершена

Пример кода:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{'Account': {'job':
                {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
             'id': '8752R00000iUjtReqS'},
             'soql': 'select Id,Name from Account'},

 'Contact': {'job':
                {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
             'id': '8800o00000POIkLlLa'},
             'soql': 'select Id,Name from Contact'}}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
                    async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

Traceback (строки ошибокне соответствует приведенному выше фрагменту кода):

Трассировка (последний последний вызов):

Файл "C: \ Code \ salesforce.py", строка 252, в асинхронном режиме.run (asyncDownload ())

Файл "C: \ Program Files \ Python37 \ lib \ asyncio \ runners.py", строка 43, в run return loop.run_until_complete (main)

Файл«C: \ Program Files \ Python37 \ lib \ asyncio \ base_events.py», строка 584, в run_until_complete, возвращает future.result ()

Файл «C: \ Code \ salesforce.py», строка 241,в asyncDownload ожидают asyncio.gather (* [retrieveResults (objectDictionary [sfObject] ['job'] ['id'], objectDictionary [sfObject] ['job'] ['batch'] ['id'], sfObject) для sfObject в objectDictionary])

Файл "C: \ Code \ salesforce.py ", строка 183, в retrieveResults chunk = await r.content.read (81920)

Файл" C: \ Program Files \ Python37 \ lib \ site-packages \ aiohttp \ streams ".py ", строка 369, чтение в ожидании, ожидание self._wait ('read')

Файл" C: \ Program Files \ Python37 \ lib \ site-packages \ aiohttp \ streams.py ", строка 297,in _wait await waiter

aiohttp.client_exceptions.ClientPayloadError: Полезная нагрузка ответа не завершена

Кажется, корень проблемы начинается с r.content.read(81920), который должен передавать данные в 81920 байткуски, но это все, что я могу получить.

Я знаю, что это не проблема сети с моей стороны, так как есть другие небольшие задания, подключенные к внешним источникам на этом сервере, которые завершаются без проблем, пока выполняется это задание,Кто-нибудь знает, что здесь происходит?Я делаю что-то неправильно?Мой код просто плохой или что-то в этом роде?

Спасибо!

-Редакт:

Я пробовал iter_any() вместо read()и все равно получаю ту же ошибку ...

async for data in r.content.iter_any():
    await outfile.write(data)

Я пытался readline() и все равно получаю ту же ошибку ...

async for line in r.content.readline():
    await outfile.write(line)
...