Как работает пул потоков, и как реализовать его в async / await env, например NodeJS? - PullRequest
7 голосов
/ 16 марта 2019

Мне нужно запустить функцию int f(int i) с параметрами 10_000, и для ее выполнения требуется около 1 секунды из-за времени ввода-вывода.
На таком языке, как Python, я могу использовать потоки (или async/await, я знаю, но я поговорю об этом позже), чтобы распараллелить эту задачу.
Если я хочу всегда иметь 10 работающих потоков и разделить задачу между ними, я могу использовать ThreadingPool :

def f(p):
    x = [...]
    return x

p = ThreadPool()
xs = p.map(f, range(10_000))

Но как это работает ? Если я хочу реализовать подобное с помощью, скажем, NodeJS и f = http("www.google.com", callback), с чего мне начать? Какие алгоритмы для такого рода проблем?
Опять же, я хотел бы получить 10 запросов одновременно, и когда один закончится, должен начаться следующий.

То, о чем я думал до сих пор (безобразно, потому что обратный вызов запускает новый вызов функции f ()):

queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
  http.get(url, (e) => {
    const newUrl = queue.pop();
    f(newUrl);
  });
};

for (var i = 0; i < 10; i++) {
  f(queue.pop());
}

Ответы [ 6 ]

4 голосов
/ 16 марта 2019

Повторная реализация той функции Bluebird, с которой я связался:

const mapWithConcurrency = async (values, concurrency, fn) => {
    let i = 0;
    let results = values.map(() => null);

    const work = async () => {
        while (i < values.length) {
            const current = i++;
            results[current] = await fn(values[current]);
        }
    };

    await Promise.all(Array.from({length: concurrency}, work));

    return results;
};

mapWithConcurrency(Array.from({length: 30 * 15}, (_, i) => i), 10, async i => {
    const el = document.body.appendChild(document.createElement('i'));
    el.style.left = 5 * (i % 30) + 'px';
    el.style.top = 5 * (i / 30 | 0) + 'px';
    await new Promise(resolve => { setTimeout(resolve, Math.random() * 500); });
    el.style.background = 'black';
    return 2 * i;
}).then(results => {
    console.log(results.length, results.every((x, i) => x === 2 * i));
});
i {
    background: grey;
    transition: background 0.3s ease-out;
    position: absolute;
    width: 5px;
    height: 5px;
}
2 голосов
/ 16 марта 2019

Не уверен, что именно так реализованы ThreadPool и другие библиотеки, но вот подсказка: используйте Queues для подсчета количества выполняемых задач / потоков.
Я не пробовал этот код, но он может дать вам представление:мы создаем поток, проверяющий каждую 0,2 секунду, если мы должны запустить другой поток.
Это подразумевает большое переключение контекста, однако и может быть неэффективным.

class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []

    def start(self):
        Thread(target=check, args=(0.2)).start()

    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)

            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)

            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()

У меня, однако, нет никакой подсказки для асинхронности/ await (ключевые слова теперь доступны в python)

0 голосов
/ 05 апреля 2019

Взгляните на мой недавно опубликованный модуль: Параллельный контроллер

Может вызывать функции одновременно с заданной степенью параллелизма.

0 голосов
/ 02 апреля 2019

Чтобы иметь такое же поведение, как у nodejs, вы должны использовать реактивное x программирование. То, что вы ищете, это RXPY. https://github.com/ReactiveX/RxPY

0 голосов
/ 02 апреля 2019

Поздний ответ, но способ, которым я обычно обрабатываю несколько потоков с максимальным пределом потока X, выглядит следующим образом:

import threading
import requests, json
import time
from urllib.parse import urlparse

final_dict = {} # will hold final results

def parser(u):
    try:
        parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
        domain = "{uri.netloc}".format(uri=parsed_uri)
        x = requests.get(u)
        status_code = x.status_code
        headers = x.headers
        cookies = x.cookies
        # OR cookies = ";".join(f"{k}:{v}" for k,v in x.cookies.iteritems())
        html = x.text
        # do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
        if not domain in final_dict:
            final_dict[domain] = []
        final_dict[domain].append( {'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html} )

    except Exception as e:
        pass
        print(e)
        return {}

max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']

for u in urls:
    threading.Thread(target=parser, args=[u]).start()
    tc = threading.active_count()
    while tc == max_threads:
        tc = threading.active_count()
        time.sleep(0.2)

while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
    tc = threading.active_count()
    time.sleep(0.2)

print(json.dumps(final_dict))

'''
# save to file
with open("output.json", "w") as f:
    f.write(json.dumps(final_dict))

# load from file
with open("output.json") as f:
    _json = json.loads(f.read())
'''

Выход:

  1. Пожалуйста, проверьте json, сгенерированный выше по адресу: https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd
  2. Приведенный выше код, в некотором роде, «мой собственный код», и под этим я подразумеваю, что он использовался в предыдущем проекте и может не полностью ответить на ваш вопрос, тем не менее, надеюсь, что это хороший ресурс для будущих пользователей.
  3. Вкл. Linux Я обычно устанавливаю max_threads на 250 и на Windows на около 150.

enter image description here

0 голосов
/ 02 апреля 2019

В python пул потоков использует только 1 процессорное ядро. Но поскольку ваша задача ограничена вводом / выводом, она будет лучше, чем последовательное выполнение вызовов функции 10 КБ.

Чтобы сделать лучше, вы можете попробовать пул процессов, который может использовать несколько ядер. Или даже объединить асинхронность с процессами. В зависимости от вашей проблемы, может быть или не быть дальнейшего ускорения при использовании этих двух подходов, используя пул потоков в качестве базового уровня.

См. этот пример объединения потока / процесса с asyncio . Это должно работать для вашего случая напрямую. Ваша функция f эквивалентна их функции block.

В Python 3.6 общей формой асинхронного кода является создание цикла событий для запуска асинхронной функции. Очень простой пример:

import asyncio

async def coroutine():
    print('in coroutine')

coro = coroutine()
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(coro)
event_loop.close()

Для простоты вы можете думать, что возвращение функции async def является чем-то, что должно быть выполнено (сопрограмма), и цикл выполняет это. Если существует N задач, которые должны выполняться асинхронно, вы можете определить их с помощью N async def функций, а другая - await s. Эта самая последняя функция async определяет, что означает «конец» для N задач. Например, может быть, «конец» означает, что все N задач выполнены, или когда 1 из них выполнена, и т. Д. И цикл выполняет эту N + 1-ю функцию.

В Python 3.7 API-интерфейсы asyncio немного изменились, и цикл не нужно создавать явно. Вы можете найти несколько примеров в моем блоге .

...