Как использовать потоки в Python? - PullRequest
1156 голосов
/ 17 мая 2010

Я пытаюсь понять многопоточность в Python. Я посмотрел на документацию и примеры, но, честно говоря, многие примеры слишком сложны, и мне трудно их понять.

Как вы четко показываете задачи, разделенные для многопоточности?

Ответы [ 18 ]

16 голосов
/ 30 октября 2016

Использование сверкающего нового concurrent.futures module

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

Подход к исполнителю может показаться знакомым всем, кто до этого запачкал свои руки Java.

Также на заметку: чтобы сохранить разумность вселенной, не забудьте закрыть свои пулы / исполнителей, если вы не используете контекст with (что так здорово, что он делает это для вас)

15 голосов
/ 27 июля 2017

В большинстве документов и учебных пособий используется модуль Python Threading и Queue, который может показаться подавляющим для начинающих.

Возможно рассмотрим модуль concurrent.futures.ThreadPoolExecutor в python 3. В сочетании с with предложением и списком это может быть настоящим шармом.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of urls to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads 
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results 
        rs = f.result()
11 голосов
/ 03 мая 2018

Вот очень простой пример импорта CSV с использованием потоков.[Включение библиотеки может отличаться для разных целей]

Функции помощника:

from threading import Thread
from project import app 
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                #DB operation/query

Функция драйвера:

import_handler(csv_file_name) 
10 голосов
/ 19 апреля 2018

Я видел много примеров, где не выполнялась настоящая работа + они были в основном связаны с процессором. Вот пример задачи с привязкой к процессору, которая вычисляет все простые числа от 10 миллионов до 10,05 миллионов. Я использовал все 4 метода здесь

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    #Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        #Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        #Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            #If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        #If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    #I am merely printing the length of the array containing all the primes but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have 4 workers
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so lets split the min and max values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        #Start the thrread with the min and max split up to compute
        #Parallel computation will not work here due to GIL since this is a CPU bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    #Dont forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the min, max interval similar to the threading method above but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use thread pool executor this time
    This method is slightly faster than using pure threading as the pools manage threads more efficiently
    This method is still slow due to the GIL limitations since we are doing a CPU bound task
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use the process pool executor
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations
    RECOMMENDED METHOD FOR CPU BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Вот результаты на моем компьютере с Mac OSX 4

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
7 голосов
/ 16 мая 2017

Многопоточность с простым примером, который будет полезен. Вы можете запустить его и легко понять, как многопоточная работа в Python. Я использовал блокировку для предотвращения доступа к другому потоку, пока предыдущие потоки не закончили свою работу. При использовании

tLock = threading.BoundedSemaphore (значение = 4)

в этой строке кода вы можете разрешить номера процессов одновременно и удерживать остаток потока, который будет запущен позже или после завершения предыдущих процессов.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()
3 голосов
/ 09 июля 2017

Ни одно из вышеперечисленных решений не использовало несколько ядер на моем сервере GNU / Linux (где у меня нет прав администратора). Они просто работали на одном ядре. Я использовал интерфейс нижнего уровня os.fork для запуска нескольких процессов. Вот код, который работал для меня:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break
2 голосов
/ 07 мая 2018
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
0 голосов
/ 03 июня 2019

Заимствуя у этот пост , мы знаем выбор между многопоточностью, многопроцессорностью и асинхронностью их использования.

Python3 имеет новую встроенную библиотеку для параллелизма и параллелизма: concurrent.futures

Итак, я продемонстрировал экспериментом, что четыре задачи (т.е. метод .sleep()) выполняются способом Threading-Pool:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

Из:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[ Примечание ]:

  • Как видно из приведенных выше результатов, лучшим вариантом было 3 рабочих четырех, что четыре задачи.
  • Если у вас есть задание процесса вместо привязки ввода / вывода (с использованием thread), вы можете изменить ThreadPoolExecutor на ProcessPoolExecutor
...