Есть ли способ извлечь HTTP-ответ, используя функцию опроса Python - PullRequest
0 голосов
/ 29 октября 2019

Я использовал следующий код для опроса сервера pub / sub

    import requests
    polling2.poll(lambda: requests.get('http://google.com').status_code == 200,
    step=60,
    poll_forever=True)

Если это возвращает true, есть ли способ получить доступ к телу ответа для получения определенных полей? В моем случае этовозвращает ответ JSON

1 Ответ

0 голосов
/ 29 октября 2019

Вы можете присвоить poll() результат переменной, и он будет иметь значение, возвращаемое lambda

result = polling2.poll(lambda: requests.get('http://google.com').status_code == 200, 
                       step=60, 
                       poll_forever=True)

Но в текущем коде lambda возвращает True и result будет True и не будет доступа к requests

Но вы можете написать его по-другому, используя check_success= для проверки состояния

import requests
import polling

def test(response):
    return response.status_code == 200

result = polling.poll(lambda: requests.get('http://google.com'), 
                      step=60, 
                      poll_forever=True, 
                      check_success=test)

print(result.text)

Теперь lambda возвращает response иpoll отправляет response в функцию test, которая проверяет status_code. Если test вернет True, то будет response до result, и вы можете использовать .text, .content или .json()


Я нашел его в исходном кодеиз polling.py


РЕДАКТИРОВАТЬ: Показывает, как использовать collect_value=. Но queue получает все значения, кроме последнего, которое у вас есть в result. Таким образом, он собирает результаты для ответов, состояние которых отличается от 200 (или сообщений об ошибках, если они содержат ошибки).

import requests
import polling
import queue

def test(response):
    return response.status_code == 200

my_queue = queue.Queue()

result = polling.poll(lambda: requests.get('http://google.com'), 
                      step=60, 
                      poll_forever=True, 
                      check_success=test,
                      collect_value=my_queue)

if my_queue.empty():
    print('empty')
else:
    while not my_queue.empty():
        print(my_queue.get())

#print(result.text)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...