Как создать и использовать параллельный менеджер http в Python? - PullRequest
0 голосов
/ 29 мая 2018

Я новичок в Python и хотел бы создать класс HttpParallelHandler для обработки в параллельном (многопоточном) режиме всех получаемых им HTTP-запросов.Клиенты (класс в других частях кода, который будет использовать HttpParallelHandler) должны регистрировать каждый HTTP-запрос, который они хотят сделать, передавая некоторые параметры, такие как метод, данные и обратный вызов onsuccess.

Послепример этого класса:

import threading
import time
import logging
import random
import Queue
import sys
import requests

class ParallelHttpHandler(object):
    POOL_SIZE = 10

    def __init__(self):
        self.requests_queue = Queue.Queue()
        self.callback_lock = threading.RLock()
        self.pool_threads = [RequestConsumerThread(self.requests_queue, self.callback_lock) for count in xrange(ParallelHttpHandler.POOL_SIZE)]
        for thread in self.pool_threads:
            thread.start()

    def http_get(self, url, data, headers=None, onsuccess=None, onerror=None):
        self.http_request("get", url, data, headers, onsuccess, onerror)

    def http_post(self, url, data, headers=None, onsuccess=None, onerror=None):
        self.http_request("post", url, data, headers, onsuccess, onerror)

    def http_request(self, method, url, data, headers, onsuccess, onerror):
        if not self.requests_queue.full():
            request = {
                "method": method,
                "url": url,
                "data": data,
                "headers": headers,
                "onsuccess": onsuccess,
                "onerror": onerror
            }
            self.requests_queue.put(request)

    def wait_all(self):
        self.requests_queue.join()

class RequestConsumerThread(threading.Thread):
    DEFAULT_HTTP_HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Accept': 'text/html, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'en-US,en;q=0.8',
        'Connection': 'keep-alive'
    }

    def __init__(self, requests_queue, callback_lock):
        super(RequestConsumerThread, self).__init__()
        self.requests_queue = requests_queue
        self.callback_lock = callback_lock
        self.daemon = True

    def http_get(self, url, data=None, headers=None):
        response = requests.get(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
        return response.text if response.status_code == 200 else None
    def http_post(self, url, data, headers=None):
        response = requests.post(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
        return response.text if response.status_code == 200 else None

    def run(self):
        while True:
            if not self.requests_queue.empty():
                request = self.requests_queue.get()
                try:
                    # Effettua la richiesta http
                    request_arguments = (request["url"], request["data"], request["headers"])
                    response = self.http_get(*request_arguments) if request["method"] == "get" else self.http_post(*request_arguments)

                    # In un contesto thread safe esegue la callback di success
                    self.callback_lock.acquire()
                    onsuccess = request["onsuccess"]
                    if onsuccess is not None:
                        onsuccess(response)
                    self.callback_lock.release()
                except Exception as e:
                    self.callback_lock.acquire()
                    onerror = request["onerror"]
                    if onerror is not None:
                        print(request["onerror"](e))
                    self.callback_lock.release()
                finally:
                    self.requests_queue.task_done()

Теперь проблема : когда я использую этот класс в цикле for для параллельной обработки 50 запросов http, у меня возникает проблема с обратным вызовом, который яназначить на каждый запрос.

Пример кода (реальный код более сложный):

if __name__ == '__main__':
    def onsuccess(html_code, index): 
        print "Success: html received. index: " + str(index)
    def onerror(e): 
        print "Error: " + str(e)

    http_handler = ParallelHttpHandler()
    urls = ["https://google.com?count=%d" % n for n in range(50)]
    for index, url in enumerate(urls):
        http_handler.http_get(url, None, None, lambda html_code: onsuccess(html_code, index), onerror)

    http_handler.wait_all()

Этот код выводит следующее:

Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49

Почему всегда 49?Я знаю, что в Python стратегия передачи - call-by-reference, но код не работает, даже если я передаю copy.copy (index).

1 Ответ

0 голосов
/ 29 мая 2018

Это из-за поздних правил связывания и определения области действия (относительно циклов) в Python.Не имеет отношения к копированию и всему HTTP.Несколько связано с звонками по ссылке.См. Более простой пример:

>>> lst = []
>>> for x in range(5):
...     lst.append(lambda : x)
... 
>>> for l in lst:
...     l()
... 
4
4
4
4
4

Решение - обернуть lambda функцией.Это производит новую область:

>>> lst = []
>>> def factory(x):
...     return lambda : x
... 
>>> for x in range(5):
...     lst.append(factory(x))
... 
>>> for l in lst:
...     l()
... 
0
1
2
3
4
...