Проблема
Основная проблема в вашем коде заключается в том, что каждый Worker
открывает ips.txt
с нуля и работает с каждым URL-адресом, найденным в ips.txt
.Таким образом, пять рабочих вместе открывают ips.txt
пять раз и работают с каждым URL-адресом пять раз.
Решение
Правильный способ решения этой проблемы - разделить код на master и рабочий .Вы уже реализовали большую часть рабочего кода.Давайте пока рассмотрим основной раздел (под if __name__ == '__main__':
) как мастер.
Теперь мастер должен запустить пять рабочих и отправить им работу через очередь (multiprocessing.Queue
).
Класс multiprocessing.Queue
дает возможность нескольким производителям помещать в него данные, а нескольким потребителям - читать данные из него, не сталкиваясь с условиями гонки.Этот класс реализует всю необходимую семантику блокировки для безопасного обмена данными в многопроцессорном контексте и предотвращения гонки.
Фиксированный код
Вот как ваш код может быть переписан в соответствии с тем, что я описалвыше:
import warnings
import requests
import multiprocessing
from colorama import init
init(autoreset=True)
from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)
from bs4 import BeautifulSoup as BS
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
class Worker(multiprocessing.Process):
def __init__(self, job_queue):
super().__init__()
self._job_queue = job_queue
def run(self):
while True:
url = self._job_queue.get()
if url is None:
break
req = url.strip()
try:
page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
timeout=10)
soup = BS(page.text)
# string = string.encode('ascii', 'ignore')
print('\033[32m' + req + ' - Title: ', soup.title)
except requests.RequestException as e:
print('\033[32m' + req + ' - TimeOut!')
if __name__ == '__main__':
jobs = []
job_queue = multiprocessing.Queue()
for i in range(5):
p = Worker(job_queue)
jobs.append(p)
p.start()
# This is the master code that feeds URLs into queue.
with open('ips.txt', 'r') as urls:
for url in urls.readlines():
job_queue.put(url)
# Send None for each worker to check and quit.
for j in jobs:
job_queue.put(None)
for j in jobs:
j.join()
В приведенном выше коде мы видим, что мастер открывает ips.txt
один раз, считывает с него URL-адреса один за другим и помещает их в очередь.Каждый работник ожидает URL, чтобы прибыть в эту очередь.Как только в очередь поступает URL-адрес, один из работников забирает его и становится занятым.Если в очереди есть еще несколько URL-адресов, следующий свободный работник выбирает следующий и т. Д.
Наконец, нам нужен какой-то способ для работников завершить работу, когда вся работа завершена.Есть несколько способов добиться этого.В этом примере я выбрал простую стратегию отправки пяти дозорных значений (в данном случае пять None
значений), по одному для каждого работника, чтобы каждый работник мог подобрать это и выйти.
Существует другая стратегия, при которой рабочие и мастер совместно используют объект multiprocessing.Event
точно так же, как они разделяют объект multiprocessing.Queue
прямо сейчас.Мастер вызывает метод set()
этого объекта всякий раз, когда он хочет, чтобы рабочие ушли.Рабочие проверяют, если этот объект is_set()
и выходят.Однако это вносит некоторую дополнительную сложность в код.Я обсуждал это ниже.
Для полноты картины, а также для демонстрации минимальных, полных и проверяемых примеров ниже я представляю два примера кода, которые показывают обе стратегии остановки.
Использование Sentinel Value для остановки рабочих
Это в значительной степени то, что я описал выше, за исключением того, что пример кода значительно упрощен для удаления зависимостей от любых библиотек за пределами стандартной библиотеки Python.
Еще одна вещь, которую стоит отметить в приведенном ниже примере, состоит в том, что вместо создания рабочего класса мы используем рабочую функцию и создаем из нее Process
.Этот тип кода часто встречается в документации по Python и довольно идиоматичен.
import multiprocessing
import time
import random
def worker(input_queue):
while True:
url = input_queue.get()
if url is None:
break
print('Started working on:', url)
# Random delay to simulate fake processing.
time.sleep(random.randint(1, 3))
print('Stopped working on:', url)
def master():
urls = [
'https://example.com/',
'https://example.org/',
'https://example.net/',
'https://stackoverflow.com/',
'https://www.python.org/',
'https://github.com/',
'https://susam.in/',
]
input_queue = multiprocessing.Queue()
workers = []
# Create workers.
for i in range(5):
p = multiprocessing.Process(target=worker, args=(input_queue, ))
workers.append(p)
p.start()
# Distribute work.
for url in urls:
input_queue.put(url)
# Ask the workers to quit.
for w in workers:
input_queue.put(None)
# Wait for workers to quit.
for w in workers:
w.join()
print('Done')
if __name__ == '__main__':
master()
Использование события для остановки рабочих
Использование объекта multiprocessing.Event
для оповещения, когда работники должны выйти, вводитнекоторая сложность в коде.Прежде всего необходимо внести три изменения:
- В мастере мы вызываем метод
set()
объекта Event
, чтобы указать, что работники должны выйти как можно скорее. - В работнике мы периодически вызываем метод
is_set()
объекта Event
, чтобы проверить, должен ли он выйти. - В мастере нам нужно использовать
multiprocessing.JoinableQueue
вместо multiprocessing.Queue
чтобы он мог проверить, полностью ли заняты очередь рабочими, прежде чем он попросит рабочих выйти. - В работнике нам нужно вызывать метод
task_done()
очереди после каждого элемента изочередь расходуется.Это необходимо для мастера, чтобы вызвать метод join()
очереди, чтобы проверить, была ли она очищена.
Все эти изменения можно найти в коде ниже:
import multiprocessing
import time
import random
import queue
def worker(input_queue, stop_event):
while not stop_event.is_set():
try:
# Check if any URL has arrived in the input queue. If not,
# loop back and try again.
url = input_queue.get(True, 1)
input_queue.task_done()
except queue.Empty:
continue
print('Started working on:', url)
# Random delay to simulate fake processing.
time.sleep(random.randint(1, 3))
print('Stopped working on:', url)
def master():
urls = [
'https://example.com/',
'https://example.org/',
'https://example.net/',
'https://stackoverflow.com/',
'https://www.python.org/',
'https://github.com/',
'https://susam.in/',
]
input_queue = multiprocessing.JoinableQueue()
stop_event = multiprocessing.Event()
workers = []
# Create workers.
for i in range(5):
p = multiprocessing.Process(target=worker,
args=(input_queue, stop_event))
workers.append(p)
p.start()
# Distribute work.
for url in urls:
input_queue.put(url)
# Wait for the queue to be consumed.
input_queue.join()
# Ask the workers to quit.
stop_event.set()
# Wait for workers to quit.
for w in workers:
w.join()
print('Done')
if __name__ == '__main__':
master()