Как использовать потоки в Python? - PullRequest
1156 голосов
/ 17 мая 2010

Я пытаюсь понять многопоточность в Python. Я посмотрел на документацию и примеры, но, честно говоря, многие примеры слишком сложны, и мне трудно их понять.

Как вы четко показываете задачи, разделенные для многопоточности?

Ответы [ 18 ]

1286 голосов
/ 11 февраля 2015

С тех пор, как этот вопрос был задан в 2010 году, произошло реальное упрощение того, как сделать простую многопоточность с python с map и pool *. * 1009

Приведенный ниже код взят из статьи / поста в блоге, который вы обязательно должны проверить (без принадлежности) - Параллельность в одной строке: Лучшая модель для повседневных задач . Я подведу итог ниже - это всего лишь несколько строк кода:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

Какая многопоточная версия:

results = []
for item in my_array:
    results.append(my_function(item))

Описание

Map - это классная маленькая функция и ключ к легкому внедрению параллелизма в ваш код Python. Для тех, кто незнаком, map - это что-то из функциональных языков, таких как Lisp. Это функция, которая отображает другую функцию в последовательности.

Map обрабатывает для нас итерации последовательности, применяет функцию и сохраняет все результаты в удобном списке в конце.

enter image description here


Осуществление

Параллельные версии функции map предоставляются двумя библиотеками: многопроцессорность, а также ее малоизвестный, но не менее фантастический шаг потомка: multiprocessing.dummy.

multiprocessing.dummy - это то же самое, что и многопроцессорный модуль, , но вместо него используются потоки ( важное отличие - использовать несколько процессов для задач с интенсивным использованием ЦП; потоки для ( и во время) IO ):

multiprocessing.dummy копирует API многопроцессорной обработки, но является не более чем оболочкой для модуля потоков.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

И результаты расчета времени:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Передача нескольких аргументов (работает так только в Python 3.3 и более поздних версиях ):

Чтобы передать несколько массивов:

results = pool.starmap(function, zip(list_a, list_b))

или передать константу и массив:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Если вы используете более раннюю версию Python, вы можете передать несколько аргументов через этот обходной путь .

(спасибо user136036 за полезный комментарий)

700 голосов
/ 17 мая 2010

Вот простой пример: вам нужно попробовать несколько альтернативных URL-адресов и вернуть содержимое первого ответа.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Это тот случай, когда многопоточность используется в качестве простой оптимизации: каждая вложенная тема ожидает разрешения и ответа URL-адреса, чтобы поместить его содержимое в очередь; каждый поток является демоном (процесс не будет продолжаться, если основной поток завершится - это более распространено, чем нет); основной поток запускает все подпотоки, делает get в очереди, чтобы подождать, пока один из них не выполнил put, затем выдает результаты и завершает работу (что снимает все подпотоки, которые могут все еще выполняться, так как они темы демона).

Правильное использование потоков в Python неизменно связано с операциями ввода-вывода (поскольку CPython в любом случае не использует несколько ядер для выполнения задач, связанных с ЦП, единственная причина для многопоточности - не блокирование процесса, пока есть ожидание некоторых I / O). Между прочим, очереди почти всегда являются наилучшим способом перераспределения работы между потоками и / или сбора результатов работы, и они по сути являются поточно-ориентированными, поэтому избавляют вас от беспокойства о блокировках, условиях, событиях, семафорах и других Концепции координации потоков / коммуникации.

247 голосов
/ 17 мая 2010

ПРИМЕЧАНИЕ : Для фактического распараллеливания в Python вы должны использовать модуль multiprocessing для ветвления нескольких процессов, которые выполняются параллельно (из-за глобальной блокировки интерпретатора потоки Python обеспечивают чередование, но фактически выполняются последовательно, а не параллельно и полезны только при чередовании операций ввода-вывода).

Однако, если вы просто ищете чередование (или выполняете операции ввода-вывода, которые можно распараллелить, несмотря на глобальную блокировку интерпретатора), то модуль threading - это место для запуска. В качестве очень простого примера, давайте рассмотрим проблему суммирования большого диапазона путем суммирования поддиапазонов параллельно:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Обратите внимание, что вышеприведенный пример является очень глупым, поскольку он абсолютно не выполняет ввод-вывод и будет выполняться последовательно, хотя и с чередованием (с дополнительными издержками переключения контекста) в CPython из-за глобальной блокировки интерпретатора.

98 голосов
/ 09 марта 2012

Как и другие упомянутые, CPython может использовать потоки только для ожидания ввода-вывода из-за GIL. Если вы хотите использовать несколько ядер для задач, связанных с процессором, используйте multiprocessing :

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
89 голосов
/ 23 сентября 2013

Просто примечание, очередь не требуется для многопоточности.

Это самый простой пример, который я могу себе представить, который показывает 10 процессов, запущенных одновременно.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"
47 голосов
/ 01 октября 2013

Ответ от Алекса Мартелли помог мне, однако вот измененная версия, которая, на мой взгляд, была более полезной (по крайней мере, для меня).

Обновлено: работает как в python2, так и в python3

try:
    # for python3
    import queue
    from urllib.request import urlopen
except:
    # for python2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()
23 голосов
/ 07 июня 2014

Я нашел это очень полезным: создать столько потоков, сколько ядер, и позволить им выполнять (большое) количество задач (в данном случае, вызывая программу оболочки):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done
22 голосов
/ 16 марта 2017

Учитывая функцию, f, проделайте это следующим образом:

import threading
threading.Thread(target=f).start()

Для передачи аргументов f

threading.Thread(target=f, args=(a,b,c)).start()
19 голосов
/ 20 июля 2017

Python 3 имеет возможность Запуск параллельных задач . Это делает нашу работу проще.

Имеется пул потоков и Пул процессов .

Следующее дает понимание:

Пример ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
18 голосов
/ 14 апреля 2013

Для меня идеальным примером потоков является мониторинг асинхронных событий. Посмотрите на этот код.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Вы можете поиграть с этим кодом, открыв сеанс IPython и выполнив что-то вроде:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Подождите несколько минут

>>>a[0] = 2
Mon = 2
...