Предельное количество активных тем Python - PullRequest
1 голос
/ 13 апреля 2020

У меня есть последовательная модель потребителя-производителя, выполнение которой занимает много времени. Поэтому я пытаюсь заставить потребительский код работать одновременно.

Примечание: объекты - это генератор.

func report_object(self, object_type, objects):
    for obj in objects:
        try:
            change_handler(obj, self.config)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
    else:
        LOG.info(" Consumer: no objects reported")

Потоковая реализация вышеуказанной функции:

import threading

func report_object(self, object_type, objects):
    threads = []
    for obj in objects:
        try:
            t = threading.Thread(target=change_handler,args=(obj, self.config))
            LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
            t.start()
            threads.append(t)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
   for t in threads: 
      t.join()
   else:
       LOG.info(" Consumer: no objects reported")

Если следовать вышеуказанному механизму, я запускаю столько потоков, сколько len (объектов). В этом случае, если объекты станут такими огромными, как 1000/10000, каково будет влияние? Будет ли состояние гонки? Если да, то как я могу предотвратить это? Я попробовал другое решение, например:

threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
    thread.start()
    LOG.info(thread.name)


for thread in threads:
    thread.join()

Количество активных потоков все еще увеличивается. Что было бы лучшим способом ограничить количество активных потоков и лучшим способом заставить вышеупомянутую функцию работать одновременно.

1 Ответ

1 голос
/ 13 апреля 2020

Лучший способ контролировать количество потоков - это использовать ThreadPoolExecutor из пакета concurrent.futures, и для этого есть несколько способов. Одним из способов является использование метода submit, который возвращает объект Future, представляющий будущее завершение потока. Если поток возвращает результат, вы можете вызвать метод result для этого объекта, который будет блокироваться до завершения вызова, а затем возвращает значение, возвращаемое из вызова (есть, конечно, много других методов, которые вы можете вызвать в Future объект). Вы не обязаны сохранять объект Future, если поток не возвращает значение или вам не нужно иным образом проверять успешное завершение.

Вот пример использования ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
import time, random

def my_thread(n):
    time.sleep(random.random())
    return n, time.time()

MAX_THREADS = 10

with ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
    futures = [e.submit(my_thread, n) for n in range(15)]
    for f in futures:
        print(f.result())

Отпечатки:

(0, 1586782110.1816075)
(1, 1586782109.4404495)
(2, 1586782109.6663365)
(3, 1586782109.8307955)
(4, 1586782109.6733325)
(5, 1586782109.6103601)
(6, 1586782109.3914738)
(7, 1586782109.6803281)
(8, 1586782109.8587916)
(9, 1586782109.7173235)
(10, 1586782110.3664994)
(11, 1586782110.1816075)
(12, 1586782110.518443)
(13, 1586782110.4524374)
(14, 1586782110.0256832)
...