Как использовать мультипроцессинг с модулем запросов? - PullRequest
0 голосов
/ 25 февраля 2019

Я новый разработчик в Python.Мой код является кодом ниже:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('\033[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('\033[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

Я пытаюсь заставить программу читать IPs.txt и распечатывать заголовок каждого веб-сайта.

Она работает без сбоев в одной ветке.Теперь я хочу сделать это быстрее, используя multiprocessing.

Но по какой-то причине он просто выводит одну и ту же строку 5 раз.Я новичок в многопроцессорной обработке и старался изо всех сил при неудачных попытках.

Снимок экрана, показывающий проблему:

screen shot showing problem

Я просто хочу запустить5 рабочих, чтобы проверить IPs.txt в многопоточности или параллельно ... Я просто хочу сделать это быстрее.

Любой намек, подсказка, помощь?

1 Ответ

0 голосов
/ 25 февраля 2019

Проблема

Основная проблема в вашем коде заключается в том, что каждый Worker открывает ips.txt с нуля и работает с каждым URL-адресом, найденным в ips.txt.Таким образом, пять рабочих вместе открывают ips.txt пять раз и работают с каждым URL-адресом пять раз.

Решение

Правильный способ решения этой проблемы - разделить код на master и рабочий .Вы уже реализовали большую часть рабочего кода.Давайте пока рассмотрим основной раздел (под if __name__ == '__main__':) как мастер.

Теперь мастер должен запустить пять рабочих и отправить им работу через очередь (multiprocessing.Queue).

Класс multiprocessing.Queue дает возможность нескольким производителям помещать в него данные, а нескольким потребителям - читать данные из него, не сталкиваясь с условиями гонки.Этот класс реализует всю необходимую семантику блокировки для безопасного обмена данными в многопроцессорном контексте и предотвращения гонки.

Фиксированный код

Вот как ваш код может быть переписан в соответствии с тем, что я описалвыше:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('\033[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('\033[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()

В приведенном выше коде мы видим, что мастер открывает ips.txt один раз, считывает с него URL-адреса один за другим и помещает их в очередь.Каждый работник ожидает URL, чтобы прибыть в эту очередь.Как только в очередь поступает URL-адрес, один из работников забирает его и становится занятым.Если в очереди есть еще несколько URL-адресов, следующий свободный работник выбирает следующий и т. Д.

Наконец, нам нужен какой-то способ для работников завершить работу, когда вся работа завершена.Есть несколько способов добиться этого.В этом примере я выбрал простую стратегию отправки пяти дозорных значений (в данном случае пять None значений), по одному для каждого работника, чтобы каждый работник мог подобрать это и выйти.

Существует другая стратегия, при которой рабочие и мастер совместно используют объект multiprocessing.Event точно так же, как они разделяют объект multiprocessing.Queue прямо сейчас.Мастер вызывает метод set() этого объекта всякий раз, когда он хочет, чтобы рабочие ушли.Рабочие проверяют, если этот объект is_set() и выходят.Однако это вносит некоторую дополнительную сложность в код.Я обсуждал это ниже.

Для полноты картины, а также для демонстрации минимальных, полных и проверяемых примеров ниже я представляю два примера кода, которые показывают обе стратегии остановки.

Использование Sentinel Value для остановки рабочих

Это в значительной степени то, что я описал выше, за исключением того, что пример кода значительно упрощен для удаления зависимостей от любых библиотек за пределами стандартной библиотеки Python.

Еще одна вещь, которую стоит отметить в приведенном ниже примере, состоит в том, что вместо создания рабочего класса мы используем рабочую функцию и создаем из нее Process.Этот тип кода часто встречается в документации по Python и довольно идиоматичен.

import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

Использование события для остановки рабочих

Использование объекта multiprocessing.Event для оповещения, когда работники должны выйти, вводитнекоторая сложность в коде.Прежде всего необходимо внести три изменения:

  • В мастере мы вызываем метод set() объекта Event, чтобы указать, что работники должны выйти как можно скорее.
  • В работнике мы периодически вызываем метод is_set() объекта Event, чтобы проверить, должен ли он выйти.
  • В мастере нам нужно использовать multiprocessing.JoinableQueue вместо multiprocessing.Queue чтобы он мог проверить, полностью ли заняты очередь рабочими, прежде чем он попросит рабочих выйти.
  • В работнике нам нужно вызывать метод task_done() очереди после каждого элемента изочередь расходуется.Это необходимо для мастера, чтобы вызвать метод join() очереди, чтобы проверить, была ли она очищена.

Все эти изменения можно найти в коде ниже:

import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()
...