Простой способ запустить кусок кода Python параллельно? - PullRequest
0 голосов
/ 01 июля 2018

У меня есть очень простой код Python:

Test = 1;

def para():
   while(True):
      if Test > 10:
         print("Test is bigger than ten");
      time.sleep(1);

para(); # I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish

while(True):
   Test = random.randint(1,42);
   time.sleep(1);

   if Test == 42:
       break;

...#stop the parallel execution of the para() here (kill it)

..some other code here

По сути, я хочу запустить функцию para () параллельно с другим кодом, чтобы приведенному ниже коду не приходилось ждать завершения para (). Однако я хочу иметь возможность доступа к текущему значению переменной Test внутри para (), когда она работает параллельно (как видно из примера кода выше). Позже, когда я решу, что я закончил с para (), работающим параллельно, я хотел бы знать, как убить его как из основного потока, так и из самого параллельно запущенного para () (self- прекратить).

Я читал некоторые учебники по многопоточности, но почти каждый учебник подходит к нему по-разному, плюс у меня были проблемы с пониманием некоторых из них, поэтому я хотел бы знать, какой самый простой способ запустить фрагмент кода параллельно.

Спасибо.

Ответы [ 2 ]

0 голосов
/ 01 июля 2018

Вместо ручного запуска потоков гораздо лучше просто использовать multiprocessing.pool. Многопроцессорная часть должна быть в функции, которую вы вызываете с помощью map. Вместо карты вы можете использовать pool.imap.

import multiprocessing
import time
def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

Также проверьте: multiprocessing.Pool: В чем разница между map_async и imap?

Также стоит обратить внимание на functools.partials, который можно использовать для передачи нескольких переменных (в дополнение к списку).

Еще одна хитрость: иногда вам действительно не нужна многопроцессорная обработка (как в нескольких ядрах вашего процессора), а только несколько потоков для одновременного запроса к базе данных со многими соединениями одновременно. В этом случае просто сделайте из multiprocessing.dummy import Pool и можете избежать запуска отдельного процесса в python (что приводит к потере доступа ко всем пространствам имен, которые вы не передаете в функцию), но сохраните все преимущества пула, просто в одном ядре процессора. Это все, что вам нужно знать о многопроцессорной обработке Python (с использованием нескольких ядер) и многопоточности (при использовании только одного процесса и сохранении целостности глобальной блокировки интерпретатора).

Еще один маленький совет: всегда старайтесь сначала использовать карту без каких-либо пулов. Затем переключитесь на pool.imap на следующем шаге, как только вы убедитесь, что все работает.

0 голосов
/ 01 июля 2018

Хорошо, во-первых, вот ответ на ваш вопрос, дословно и самым простым способом. После этого мы ответим немного более полно с двумя примерами, которые показывают два способа сделать это и разделить доступ к данным между основным и параллельным кодом.

import random

from threading import Thread
import time

Test = 1;
stop = False

def para():
   while not stop:
      if Test > 10:
         print("Test is bigger than ten");
      time.sleep(1);

# I want this to start in parallel, so that the code below keeps executing without waiting for this function to finish

thread = Thread(target=para)
thread.start()

while(True):
   Test = random.randint(1,42);
   time.sleep(1);

   if Test == 42:
       break;

#stop the parallel execution of the para() here (kill it)
stop = True
thread.join()

#..some other code here
print( 'we have stopped' )

А теперь, более полный ответ:

Далее мы покажем два примера кода (перечислены ниже), которые демонстрируют (а) параллельное выполнение с использованием интерфейса многопоточности и (б) использование многопроцессорного интерфейса. Какой из них вы решите использовать, зависит от того, что вы пытаетесь сделать. Потоки могут быть хорошим выбором, когда целью второго потока является ожидание ввода-вывода, а многопроцессорная обработка может быть хорошим выбором, когда второй поток предназначен для выполнения интенсивных вычислений ЦП.

В вашем примере основной код изменил переменную, а параллельный код только исследовал переменную. Если вы хотите изменить переменную из обоих, например, для сброса общего счетчика, все будет иначе. Итак, мы покажем вам, как это сделать.

В следующих примерах кодов:

1) Переменные " counter " и " run " и " lock " совместно используются основной программой и кодом, выполняемым параллельно.

2) Функция myfunc () , выполняется параллельно. Он циклически обновляет counter и спит до тех пор, пока основная программа не установит значение run в false.

3) Основная программа повторяет печать значения counter , пока не достигнет 5, после чего она сбрасывает счетчик. Затем, после того как он снова достигнет 5, он устанавливает run в false и, наконец, ожидает выхода потока или процесса, прежде чем покинуть себя.

Вы можете заметить, что counter увеличивается внутри вызовов до local.acquire () и lock.release () в первом примере или с lock во втором примере.

Увеличение счетчика состоит из трех этапов: (1) считывание текущего значения, (2) добавление к нему одного и затем (3) сохранение результата обратно в счетчик. Проблема возникает, когда один поток пытается установить счетчик в то же время, когда это происходит.

Мы решаем эту проблему, когда основная программа и параллельный код получают блокировку до того, как они изменят переменную, а затем освобождают , когда они будут выполнены. Если блокировка уже снята, программа или параллельный код ждут, пока она не будет снята. Это синхронизирует их доступ для изменения общих данных, то есть счетчика. (Кроме того, см. семафор для другого вида синхронизации).

С этим введением, вот первый пример, который использует потоки:

# Parallel code with shared variables, using threads
from threading import Lock, Thread
from time import sleep

# Variables to be shared across threads
counter = 0
run = True
lock = Lock()

# Function to be executed in parallel
def myfunc():

    # Declare shared variables
    global run
    global counter
    global lock

    # Processing to be done until told to exit
    while run:
        sleep( 1 )

        # Increment the counter
        lock.acquire()
        counter = counter + 1
        lock.release()

    # Set the counter to show that we exited
    lock.acquire()
    counter = -1
    lock.release()
    print( 'thread exit' )

# ----------------------------

# Launch the parallel function as a thread
thread = Thread(target=myfunc)
thread.start()

# Read and print the counter
while counter < 5:
    print( counter )
    sleep( 1 )

# Change the counter    
lock.acquire()
counter = 0
lock.release()

# Read and print the counter
while counter < 5:
    print( counter )
    sleep( 1 )

# Tell the thread to exit and wait for it to exit
run = False
thread.join()

# Confirm that the thread set the counter on exit
print( counter )

А вот второй пример, который использует многопроцессорность. Обратите внимание, что для доступа к общим переменным требуется несколько дополнительных шагов.

from time import sleep
from multiprocessing import Process, Value, Lock

def myfunc(counter, lock, run):

    while run.value:
        sleep(1)
        with lock:
            counter.value += 1
            print( "thread %d"%counter.value )

    with lock:
        counter.value = -1
        print( "thread exit %d"%counter.value )

# =======================

counter = Value('i', 0)
run = Value('b', True)
lock = Lock()

p = Process(target=myfunc, args=(counter, lock, run))
p.start()

while counter.value < 5:
    print( "main %d"%counter.value )
    sleep(1)

with lock:
    counter.value = 0

while counter.value < 5:
    print( "main %d"%counter.value )
    sleep(1)

run.value = False

p.join()

print( "main exit %d"%counter.value)
...