RuntimeError: невозможно запустить новый поток, используя ThreadPool и только несколько потоков - PullRequest
0 голосов
/ 02 мая 2018

Иногда я получаю RuntimeError (я бы сказал, не более 1% времени) при использовании ThreadPool из multiprocessing.pool в Python.

Я читал, что это происходит, если кто-то пытается открыть сотни потоков. В моем случае это должно быть максимум 4 потока, поэтому я немного запутался, почему это происходит.

Я использовал ранее в точно такой же среде ThreadPool с 3 потоками и никогда не получал ошибку.

Мой код:

import time
from multiprocessing.pool import ThreadPool

while True:
    qty_fetched = 6
    time.sleep(random_secs(0.5))
    pending_updates = fetch_pending_updates(qty_fetched) #list of dicts

    if pending_updates:
        prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)

        if prio > 3:
            qty_threads = 1

        elif prio == 0 or prio == 1:
            qty_threads = 4

        else:
            qty_threads = 3

        pool = ThreadPool(qty_threads)
        pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads

    else:
        time.sleep(2)

И трассировка:

...
pool = ThreadPool(qty_threads) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__ 
Pool.__init__(self, processes, initializer, initargs) 
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__ 
self._task_handler.start() 
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start 
_start_new_thread(self._bootstrap, ()) 
RuntimeError: can't start new thread 

Есть идеи, что это за проблема?


Попытка:

С здесь Я узнал о ThreadPoolExecutor.

Я решил попробовать:

import time
from concurrent.futures import ThreadPoolExecutor

while True:
    qty_fetched = 6
    time.sleep(random_secs(0.5))
    pending_updates = fetch_pending_updates(qty_fetched) #list of dicts

    if pending_updates:
        prio = 2 #some variable number between 0 and 4

        if prio > 3:
            qty_threads = 1

        elif prio == 0 or prio == 1:
            qty_threads = 4

        else:
            qty_threads = 3

        #the following lines changed
        with ThreadPoolExecutor(max_workers=qty_threads) as e:
            for pu in pending_updates:
                e.submit(self.run_update_NEW, pu)

    else:
        time.sleep(2)

Я буду держать сообщение обновленным, объясняя, если это работает.

1 Ответ

0 голосов
/ 02 мая 2018

Проблема, которую я вижу в вашем коде, заключается в том, что у вас есть бесконечный цикл while True, в котором вы создаете свой пул, но фактически никогда не закрываете его. Теперь вы продолжаете создавать пулы, но поскольку вы никогда не закрываете и не присоединяетесь к пулу, «старые» потоки, скорее всего, просто будут висеть там, и через долю секунды вы создадите их больше. Я предполагаю, что вы в конечном итоге просто исчерпали свои ресурсы и где-то достигли лимита процесса или ядра.

Я бы переместил создание пула за пределы цикла while и просто продолжал использовать тот же пул в вашем цикле. В этом вся идея пула - иметь процессы или потоки, ожидающие появления работы, устраняя накладные расходы на создание процессов / потоков при запуске повторяющихся задач.

Если есть причина для повторного запуска пула (я не могу понять, что это может быть - если вам нужно время от времени обновлять своих работников, вы можете использовать maxtasksperchild в декларации вашего пула), то по крайней мере закройте старый пул правильно, так как вы больше не будете кормить его работой.

...