Читать Http Stream - PullRequest
       23

Читать Http Stream

0 голосов
/ 30 ноября 2018

Я пытаюсь прочитать из потокового API, куда данные отправляются с использованием Chunked Transfer Encoding.Может быть больше чем одна запись на блок, каждая запись отделена CRLF.И данные всегда отправляются с использованием сжатия gzip.Я пытаюсь получить канал, а затем сделать некоторую обработку за один раз.Я прошел через кучу ресурсов stackOverflow, но не смог найти способ сделать это в Python.размер iter_content (chunk) в моем случае вызывает исключение в строке.

for chunk in api_response.iter_content(chunk_size=1024): 

В Fiddler (который я использую в качестве прокси-сервера) я вижу, что данные постоянно загружаются и выполняют "COMETPeek "в Fiddler, я могу увидеть пример JSON.

Даже iter_lines не работает.Я посмотрел на случай asyncio и aiohttp, упомянутый здесь: Почему сгенерированный запрос ничто не возвращает.Какое время ожидания по умолчанию использует request.get ()?

, но не знаете, как выполнить обработку.Как вы можете видеть, я пытался использовать кучу библиотек Python.Извините, в коде могут быть некоторые библиотеки, которые я позже удалил из использования, поскольку он не работал.

Я также просмотрел документацию для библиотеки запросов, но не смог найти ничего существенного.

Как упоминалось выше, ниже приведен пример кода того, что я пытаюсь сделать.Любые указатели на то, как мне следует поступить, будут высоко оценены.

Я впервые пытаюсь прочитать поток

from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import requests
import zlib
import json

READ_BLOCK_SIZE = 1024*8

clientID="ClientID"
clientSecret="ClientSecret"

proxies = {
"https": "http://127.0.0.1:8888",
}

client = BackendApplicationClient(client_id=clientID)
oauth = OAuth2Session(client=client)

token = oauth.fetch_token(token_url='https://baseTokenURL/token', client_id=clientID,client_secret=clientSecret,proxies=proxies,verify=False) 

auth_t=token['access_token']
#auth_t = accesstoken.encode("ascii", "ignore")

headers = {
'authorization': "Bearer " + auth_t,
'content-type': "application/json",
'Accept-Encoding': "gzip",
}
dec=zlib.decompressobj(32 + zlib.MAX_WBITS)

try:
    init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
    if init_res.status_code == 302:
        print(init_res.headers['Location'])
        api_response = requests.get(init_res.headers['Location'], headers=headers, allow_redirects=False,proxies=proxies,verify=False, timeout=20, stream=True,params={"smoothing":"1", "smoothingBucketSize" : "180"})
        if  api_response.status_code == 200:
            #api_response.raw.decode_content = True

            #print(api_response.raw.read(20))
            for chunk in api_response.iter_content(chunk_size=api_response.chunk_size): 
                #Parse the response
    elif init_res.status_code == 200:
        print(init_res.content)
except Exception as ce:
    print(ce)

ОБНОВЛЕНИЕ Я смотрю наэто сейчас: https://aiohttp.readthedocs.io/en/v0.20.0/client.html

Это будет путь?

1 Ответ

0 голосов
/ 01 декабря 2018

На всякий случай, если кто-то найдет это полезным.Я нашел способ для потоковой передачи из API через Python с помощью aiohttp.Ниже скелет.Помните, что это просто скелет, и он работает, постоянно показывая мне результаты.Если у кого-то есть лучший способ сделать это - я весь в ушах и глазах, так как это первый раз, когда я пытаюсь поймать ручей.

async def fetch(session, url, headers):
    with async_timeout.timeout(None):
        async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
            while True:
                chunk=await r.content.read(1024*3)
                if not chunk:
                    break                    
                print(chunk)

async def main(url, headers):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url,headers)

В звонящем

try:
    init_res = requests.get('https://BaseStreamURL/api/1/stream/specificStream', headers=headers, allow_redirects=False,proxies=proxies,verify=False)
    if init_res.status_code == 302:
        loc=init_res.headers['Location']
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loc, headers=headers))
    elif init_res.status_code == 200:
        print(init_res.content)
except Exception as ce:
    print(ce)
...