Загрузки с ограничением скорости среди нескольких процессов - PullRequest
5 голосов
/ 21 марта 2019

Я хочу скачать и обработать много файлов с сайта. Условия обслуживания сайта ограничивают количество файлов, которые вам разрешено загружать в секунду.

Время, необходимое для обработки файлов, на самом деле является узким местом, поэтому я хотел бы иметь возможность обрабатывать несколько файлов параллельно. Но я не хочу, чтобы различные процессы объединялись, чтобы нарушить лимит загрузки. Так что мне нужно что-то, что ограничивает скорость запроса. Я думал что-то вроде следующего, но я не совсем эксперт с модулем multiprocessing.

import multiprocessing
from multiprocessing.managers import BaseManager
import time

class DownloadLimiter(object):

    def __init__(self, time):
        self.time = time
        self.lock = multiprocessing.Lock()

    def get(self, url):
        self.lock.acquire()
        time.sleep(self.time)
        self.lock.release()
        return url


class DownloadManager(BaseManager):
    pass

DownloadManager.register('downloader', DownloadLimiter)


class Worker(multiprocessing.Process):

    def __init__(self, downloader, queue, file_name):
        super().__init__()
        self.downloader = downloader
        self.file_name = file_name
        self.queue = queue

    def run(self):
        while not self.queue.empty():
            url = self.queue.get()
            content = self.downloader.get(url)
            with open(self.file_name, "a+") as fh:
                fh.write(str(content) + "\n")

Затем в другом месте запускаются загрузки с

manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()

urls = range(50)
for url in urls:
    queue.put(url)

job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]

for job in jobs:
    job.start()

for job in jobs:
    job.join()

Это, кажется, делает работу в небольшом масштабе, но я немного опасаюсь, действительно ли блокировка выполняется правильно.

Кроме того, если есть лучшая схема достижения той же цели, я бы с удовольствием ее услышал.

Ответы [ 5 ]

1 голос
/ 26 марта 2019

Это можно сделать чисто с помощью Ray , который является библиотекой для параллельного и распределенного Python.

Ресурсы в Ray

Когда выЗапустите Рэй, вы можете сказать, какие ресурсы доступны на этой машине.Ray автоматически попытается определить количество ядер ЦП и количество графических процессоров, но они могут быть указаны, и фактически произвольные пользовательские ресурсы также могут быть переданы, например, путем вызова

ray.init(num_cpus=4, resources={'Network': 2})

Это говорит Рэю, что на машине имеется 4 ядра ЦП и 2 пользовательских ресурса под названием Network.

Каждая «задача» Рэя, которая является планируемым блоком работы,имеет определенные требования к ресурсам.По умолчанию для задачи требуется 1 ядро ​​процессора и больше ничего.Однако произвольные требования к ресурсам можно указать, объявив соответствующую функцию с помощью

@ray.remote(resources={'Network': 1})
def f():
    pass

. Это говорит Рэю, что для выполнения f в «рабочем» процессе должно быть 1 ядро ​​ЦП (значение по умолчанию) и ресурс 1 Network.

Поскольку на компьютере установлено 2 ресурса Network и 4 ядра ЦП, одновременно может выполняться не более 2 копий f.С другой стороны, если есть другая функция g, объявленная с

@ray.remote
def g():
    pass

, то одновременно могут быть выполнены четыре копии g или две копии f и две копии gбыть выполненным одновременно.

Пример

Вот пример с заполнителями для фактических функций, используемых для загрузки контента и его обработки.

import ray
import time

max_concurrent_downloads = 2

ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})

@ray.remote(resources={'Network': 1})
def download_content(url):
    # Download the file.
    time.sleep(1)
    return 'result from ' + url

@ray.remote
def process_result(result):
    # Process the result.
    time.sleep(1)
    return 'processed ' + result

urls = ['url1', 'url2', 'url3', 'url4']

result_ids = [download_content.remote(url) for url in urls]

processed_ids = [process_result.remote(result_id) for result_id in result_ids]

# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)

Вот описание временной шкалы (которое вы можете получить, запустив ray timeline из командной строки и открыв получившийся файл JSON в chrome: // tracing в веб-браузере Chrome).

В приведенном выше сценариимы представляем 4 download_content задач.Это те, которые мы ограничиваем, указав, что им требуется ресурс Network (в дополнение к ресурсу ЦП 1 по умолчанию).Затем мы отправляем 4 process_result задач, каждая из которых требует по умолчанию 1 ресурс ЦП.Задачи выполняются в три этапа (просто посмотрите на поля blue ).

  1. Мы начинаем с выполнения 2 download_content задач, что столько, сколько можно выполнить за одинвремя (из-за ограничения скорости).Мы пока не можем выполнить ни одну из задач process_result, поскольку они зависят от результатов выполнения задач download_content.
  2. Они завершаются, поэтому мы начинаем выполнять оставшиеся две задачи download_content, а также двеprocess_result задач, потому что мы не ограничиваем скорость process_result задач.
  3. Мы выполняем оставшиеся process_result задач.

Каждая "строка" - это один рабочий процесс.Время идет слева направо.

enter image description here

Подробнее о том, как это сделать, можно прочитать в документации Ray .

0 голосов
/ 29 марта 2019

ОК, после следующего пояснения от ОП

Под "загрузками в секунду" я имею в виду, что в глобальном масштабе не более чем количество загрузок, начинающихся в секунду.

Я решил опубликовать другой ответ, так как думаю, что мой первый может быть также интересен для тех, кто хочет ограничить число одновременно работающих процессов.

Я думаю, что для решения этой проблемы нет необходимости использовать дополнительные платформы.Идея состоит в том, чтобы использовать потоки загрузки, которые порождаются для каждой ссылки на ресурс, очереди ресурсов и фиксированного числа процессоров, которые являются процессами, а не потоками:

#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue


WORKERS = 4
DOWNLOADS_PER_SECOND = 2


def download_resource(url, resource_queue):
    pid = os.getpid()

    t = time.strftime('%H:%M:%S')
    print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
          flush=True)
    time.sleep(random.randint(1, 10))

    results = '[resource {}]'.format(url)
    resource_queue.put(results)


def process_resource(resource_queue):
    pid = os.getpid()

    while True:
        res = resource_queue.get()

        print('Process {p} is processing {r}'.format(p=pid, r=res),
              flush=True)
        time.sleep(random.randint(1, 10))

        resource_queue.task_done()


def main():
    resource_queue = JoinableQueue()

    # Start process workers:
    for _ in range(WORKERS):
        worker = Process(target=process_resource,
                         args=(resource_queue,),
                         daemon=True)
        worker.start()

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    while urls:
        target_urls = urls[:DOWNLOADS_PER_SECOND]
        urls = urls[DOWNLOADS_PER_SECOND:]

        # Start downloader threads:
        for url in target_urls:
            downloader = Thread(target=download_resource,
                                args=(url, resource_queue),
                                daemon=True)
            downloader.start()

        time.sleep(1)

    resource_queue.join()


if __name__ == '__main__':
    main()

Результаты выглядят примерно так:

$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]

Здесь каждый второй DOWNLOADS_PER_SECOND поток запускается, два в этом примере, которые затем загружают и помещают ресурсы в очередь.WORKERS - это ряд процессов, которые получают ресурсы из очереди для дальнейшей обработки.С помощью этой настройки вы сможете ограничить число загружаемых загрузок в секунду, а работники будут параллельно обрабатывать полученные ресурсы.

0 голосов
/ 27 марта 2019

Не совсем понятно, что вы имеете в виду под «ограничением скорости загрузки». В данном случае это число одновременных загрузок , что является частым вариантом использования, и я думаю, что простое решение состоит в использовании семафоров с пулом процессов:

#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager


CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2


def download(url, semaphore):
    pid = os.getpid()

    with semaphore:
        print('Process {p} is downloading from {u}'.format(p=pid, u=url))
        time.sleep(random.randint(1, 5))

    # Process the obtained resource:
    time.sleep(random.randint(1, 5))

    return 'Successfully processed {}'.format(url)


def main():
    manager = Manager()

    semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
    target = partial(download, semaphore=semaphore)

    urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]

    with Pool(processes=CPU_NUM) as pool:
        results = pool.map(target, urls)

    print(results)


if __name__ == '__main__':
    main()

Как видите, одновременно загружаются только CONCURRENT_DONWLOADS процессов, в то время как другие заняты обработкой полученных ресурсов.

0 голосов
/ 26 марта 2019

Самый простой подход заключается в загрузке в основной поток и подаче документов в рабочий пул.

В своих собственных реализациях я прошел путь использования сельдерея для обработки документов и использования gevent для загрузок. Который делает то же самое только с большей сложностью.

Вот простой пример.

import multiprocessing
from multiprocessing import Pool
import time
import typing

def work(doc: str) -> str:
    # do some processing here....
    return doc + " processed"

def download(url: str) -> str:
    return url  # a hack for demo, use e.g. `requests.get()`

def run_pipeline(
    urls: typing.List[str],
    session_request_limit: int = 10,
    session_length: int = 60,
) -> None:
    """
    Download and process each url in `urls` at a max. rate limit
    given by `session_request_limit / session_length`
    """
    workers = Pool(multiprocessing.cpu_count())
    results = []

    n_requests = 0
    session_start = time.time()

    for url in urls:
        doc = download(url)
        results.append(
            workers.apply_async(work, (doc,))
        )
        n_requests += 1

        if n_requests >= session_request_limit:
            time_to_next_session = session_length - time.time() - session_start
            time.sleep(time_to_next_session)

        if time.time() - session_start >= session_length:
            session_start = time.time()
            n_requests = 0

    # Collect results
    for result in results:
        print(result.get())

if __name__ == "__main__":
    urls = ["www.google.com", "www.stackoverflow.com"]
    run_pipeline(urls)
0 голосов
/ 26 марта 2019

Для ваших нужд есть библиотека, которая называется ratelimit

Пример с их домашней страницы:

Эта функция не сможет выполнить более 15 вызовов API в течение 15-минутного периода.

from ratelimit import limits

import requests

FIFTEEN_MINUTES = 900

@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
    response = requests.get(url)

    if response.status_code != 200:
        raise Exception('API response: {}'.format(response.status_code))
    return response

Кстати, в интенсивных задачах ввода-вывода (таких как сканирование веб-страниц) вы можете использовать многопоточность вместо многопроцессорной обработки. При использовании многопроцессорной обработки вы должны создать еще один процесс для управления и организовать все, что вы делаете. В случае многопоточного подхода все потоки по своей природе будут иметь доступ к памяти основного процесса, поэтому передача сигналов становится намного проще (так как e распределяется между потоками):

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...