Превращение многопоточного кода с неограниченным количеством потоков в многопоточный код с максимальным количеством одновременно работающих потоков - PullRequest
0 голосов
/ 11 октября 2019

У меня есть скрипт, который выполняет определенную функцию многопоточностью. Теперь интересно иметь столько же параллельных потоков, сколько и процессорных ядер. Теперь текущий код ( 1: ) с использованием оператора threading.thread создает 1000 потоков и запускает их все одновременно. Я хочу превратить это во что-то, что запускает только фиксированное количество потоков одновременно (например, 8) и помещает остальное в очередь до тех пор, пока исполняемое ядро ​​потока / процессора не станет бесплатным для использования.

1:

import threading

nSim = 1000

def simulation(i):
    print(str(threading.current_thread().getName()) + ': '+ str(i))

if __name__ == '__main__':
    threads = [threading.Thread(target=simulation,args=(i,)) for i in range(nSim)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

Q1: Код 2: делает то, что я описал? (многопоточность с максимальным количеством одновременно работающих потоков) Это правильно? (Я так думаю, но я не уверен на 100%)

Q2: Теперь код запускает 1000 потоков одновременно и выполняет их на 8 потоках. Есть ли способ инициировать новый поток только тогда, когда исполняемое ядро ​​потока / процессора свободно для использования (для того, чтобы у меня не было 990 потоков вызовов, ожидающих с самого начала выполнения, когда это возможно?

Q3: Есть ли способ отследить, какое ядро ​​процессора выполнило какой поток? Просто для доказательства того, что код делает то, что должен.

2:

import threading
import multiprocessing

print(multiprocessing.cpu_count())
from concurrent.futures import ThreadPoolExecutor

nSim = 1000

def simulation(i):
    print(str(threading.current_thread().getName()) + ': '+ str(i))

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=8) as executor:
        for i in range (nSim):
            res = executor.submit(simulation, i)
            print(res.result())

Ответы [ 2 ]

1 голос
/ 11 октября 2019

A1: Нет, ваш код отправляет задачу, получает Future в res, а затем вызывает result, который ожидает результата. Только после того, как было выполнено предыдущее задание, новое задание передается потоку. Только один из рабочих потоков действительно работает за один раз.

Взгляните на ThreadPool.map (фактически Pool.map) вместо submit, чтобы распределить задачи среди рабочих.

A2: только 8 потоков (количество рабочих) используются здесь максимум. При использовании map входные данные 1000 задач могут быть сохранены (требуется память), но дополнительные потоки не создаются.

A3: Не знаю, о чем я знаю. Поток не привязан к ядру, он может быстро переключаться между ними.

1 голос
/ 11 октября 2019

A1: чтобы ограничить количество потоков, которые могут одновременно иметь доступ к какому-либо ресурсу, вы можете использовать многопоточность. Семафор На самом деле 1000 потоков не дадут вам огромного прироста скорости, рекомендуемое количество потоков на процесс - mp.cpu_count ()* 1 или mp.cpu_count () * 2 в некоторых статьях. Также обратите внимание, что потоки хороши для операций ввода-вывода в python, но не для вычислений из-за GIL.

A2. Зачем вам столько потоков, если вы хотите запускать только 8 из них одновременно? Создайте только 8 потоков, а затем предоставьте им Задачи, когда Задачи будут готовы, для этого вам нужно использовать queue.Queue (), которая является поточно-ориентированной. Но в вашем конкретном примере вы можете сделать следующее, чтобы выполнить свой тест 250 раз для каждого потока, используя функцию внутри симуляции, кстати, вам не нужен семафор в этом случае.

A3. Когда мы говорим о многопоточности, у вас есть один процесс с несколькими потоками.

import threading<br>
import time<br>
import multiprocessing as mp                                                                                            </p>

<p>def simulation(i, _s):<br>
    # s is threading.Semaphore()<br>
    with _s:<br>
        print(str(threading.current_thread().getName()) + ': ' + str(i))<br>
        time.sleep(3)                                                                                                   </p>

<p>if <strong>name</strong> == '<strong>main</strong>':<br>
    print("Cores number: {}".format(mp.cpu_count()))<br>
    # recommended number of threading is mp.cpu_count()*1 or mp.cpu_count()*2 in some articles<br>
    nSim = 25                                                                                                           </p>

<code>s = threading.Semaphore(4)  # max number of threads which can work simultaneously with resource is 4                
threads = [threading.Thread(target=simulation, args=(i, s, )) for i in range(nSim)]                                 

for t in threads:                                                                                                   
    t.start()                                                                                                       

# just to prove that all threads are active in the start and then their number decreases when the work is done      
for i in range(6):                                                                                                  
    print("Active threads number {}".format(threading.active_count()))                                              
    time.sleep(3)                                                                                                                                                                                                                                                                                                                         
</code>

...