threading.Thread()
создает новый поток, а t1.start()
просто отправляет его.
Этот код:
for num in range(0,10):
t1 = threading.Thread(target=test1, args=(num,))
t2 = threading.Thread(target=test2, args=(num,))
t1.start()
t2.start()
фактически создает и запускает 2 новых потока за итерацию. В конце у вас есть 20 потоков + основной поток.
Также, когда вы запускаете поток, вы должны подождать, пока он не закончится, или запустить его как поток демона. С демоном нить вы говорите, что мне все равно, что вы делаете и когда вы заканчиваете.
Базовое использование потоков может выглядеть так:
import threading
def do_stuff():
print("Stuff on thread {}".format(threading.get_ident()))
print("Main thread {}".format(threading.get_ident()))
t = threading.Thread(target=do_stuff) # Specify what should be running in new thread
t.start() # Dispatch thread
t.join() # Wait until the thread is done
Примечание: threading.get_ident()
дает вам уникальный идентификатор потока, в котором вызывается эта функция.
Теперь из вашего примера, если вы хотите запустить 2 независимых потока, вы можете сделать это:
import threading
import time
def test2():
for count in range(0, 10):
if count == 8:
print("test2: sleep for 4 sec")
time.sleep(3.0)
print("test2: thread = {}".format(count))
def test1():
for count in range(0, 10):
if count == 5:
print("test 1: sleep for 5 sec")
time.sleep(3.0)
print("test1: thread = {}".format(count))
t1 = threading.Thread(target=test1)
t2 = threading.Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()
Но, возможно, вы захотите синхронизировать эти потоки и отправить им какой-то элемент в одно и то же время.
import threading
# Create threads
t1 = threading.Thread(target=test1)
t2 = threading.Thread(target=test2)
# Run threads
t1.start()
t2.start()
# Go through some list or whatever
for num in range(0,10):
# send num to t1
# send num to t2
# wait for t1 and t2
pass
# Wait until threads are finished with their jobs
t1.join()
t2.join()
Для отправки значения в другой поток мы можем использовать queue.Queue. Вы можете безопасно поместить это значение в один поток, а второй поток может прочитать его или подождать, пока что-то появится (или несколько потоков могут писать, а несколько потоков могут читать).
import threading
import time
import queue
def test2(q):
while True:
count = q.get() # Get data from the q2 queue
if count == 8:
print("test2: sleep for 4 sec")
time.sleep(3.0)
print("test2: thread = {}".format(count))
def test1(q):
while True:
count = q.get() # Get data from the q1 queue
if count == 5:
print("test 1: sleep for 5 sec")
time.sleep(3.0)
print("test1: thread = {}".format(count))
# Creates queues
q1 = queue.Queue()
q2 = queue.Queue()
# Create threads
t1 = threading.Thread(target=test1, args=(q1, ))
t2 = threading.Thread(target=test2, args=(q2, ))
# Run threads
t1.start()
t2.start()
# Go through some list or whatever
for num in range(0, 10):
# send num to t1
q1.put(num)
# send num to t2
q2.put(num)
# wait for t1 and t2
# ???
# Wait until threads are finished with their jobs
t1.join()
t2.join()
Ой, подождите ... как мы можем знать, что потоки закончили свою работу, и мы можем отправить другое значение? Ну, мы можем использовать Queue
снова. Создайте новую пару и отправьте, например, True
в конце функции test?
и затем ждать чтения в основном цикле из этих очередей. Но для отправки информации о состоянии мы должны использовать threading.Event
.
import threading
import time
import queue
def test2(q, e):
while True:
count = q.get() # Get data from the q2 queue
if count == 8:
print("test2: sleep for 4 sec")
time.sleep(3.0)
print("test2: thread = {}".format(count))
e.set() # Inform master the processing of given value is done
def test1(q, e):
while True:
count = q.get() # Get data from the q1 queue
if count == 5:
print("test 1: sleep for 5 sec")
time.sleep(3.0)
print("test1: thread = {}".format(count))
e.set() # Inform master the processing of given value is done
# Creates queues
q1 = queue.Queue()
q2 = queue.Queue()
# Create events
e1 = threading.Event()
e2 = threading.Event()
# Create threads
t1 = threading.Thread(target=test1, args=(q1, e1))
t2 = threading.Thread(target=test2, args=(q2, e2))
# Run threads
t1.start()
t2.start()
# Go through some list or whatever
for num in range(0, 10):
# send num to t1
q1.put(num)
# send num to t2
q2.put(num)
# wait for t1
e1.wait()
# wait for t2
e2.wait()
# Wait until threads are finished with their jobs
t1.join()
t2.join()
Сейчас мы почти на месте, но сценарий никогда не заканчивается. Это связано с тем, что test?
функции (потоки) в бесконечном цикле ждут данных (из очередей q1 / q2). Нам нужен какой-то способ, как сказать им: «Хорошо, это все, ребята». Для этого мы можем сказать, что значение None в очередях означает конец. Результат следующий:
import threading
import time
import queue
def test2(q, e):
while True:
count = q.get() # Get data from the q2 queue
if count is None: # Exit on None value
return
if count == 8:
print("test2: sleep for 4 sec")
time.sleep(3.0)
print("test2: thread = {}".format(count))
e.set() # Inform master the processing of given value is done
def test1(q, e):
while True:
count = q.get() # Get data from the q1 queue
if count is None: # Exit on None value
return
if count == 5:
print("test 1: sleep for 5 sec")
time.sleep(3.0)
print("test1: thread = {}".format(count))
e.set() # Inform master the processing of given value is done
# Creates queues
q1 = queue.Queue()
q2 = queue.Queue()
# Create events
e1 = threading.Event()
e2 = threading.Event()
# Create threads
t1 = threading.Thread(target=test1, args=(q1, e1))
t2 = threading.Thread(target=test2, args=(q2, e2))
# Run threads
t1.start()
t2.start()
# Go through some list or whatever
for num in range(0, 10):
# send num to t1
q1.put(num)
# send num to t2
q2.put(num)
# wait for t1
e1.wait()
# wait for t2
e2.wait()
# Inform threads to exit
q1.put(None)
q2.put(None)
# Wait until threads are finished with their jobs
t1.join()
t2.join()
Примечание: вместо использования параметров в «основных» функциях потоков вы можете использовать глобальные переменные, поскольку глобальные переменные или атрибуты класса являются общими для всех потоков. Но обычно это плохая практика.
Имейте в виду, что ошибки должны быть связаны с потоками, например, обработка исключений не так проста. Представьте, что функция test1
вызывает исключение перед вызовом e.set()
. Тогда главный поток никогда не останавливается в ожидании e1.wait()
.
Кроме того, CPython (наиболее распространенная реализация Python) имеет что-то под названием GIL , что в основном (за некоторыми исключениями) позволяет запускать только 1 поток за раз, а остальные спят.
Документация по потокам
Документация очереди