Ваша первая проблема заключается в том, что вы уже вызывали свою функцию в MainThread
с вызовом:
pool.apply_async(test_post())
... вместо передачи test_post
в качестве аргумента для вызова, который будет выполнен вworker-thread with:
pool.apply_async(test_post)
OP: У меня есть довольно хороший скрипт, использующий Threading, но потом я прочитал, что ему потребуется ручное кодирование для поддержки n числа одновременных потоков(имеется в виду, начиная новые, как только старые закончат) ...
Необходимо различать единицу работы (задание, задание) и поток.Весь смысл использования пула в первую очередь заключается в повторном использовании исполнителей, будь то потоки или процессы.Рабочие уже созданы, когда создается экземпляр пула, и пока вы не закроете пул, все начальные потоки останутся живыми.Таким образом, вы не заботитесь о воссоздании потоков, вы просто вызываете методы пула существующего пула так часто, как у вас есть работа, которую вы хотите распределить.Пул принимает эти задания (вызов метода пула) и создает из него задачи.Эти задачи помещаются в неограниченную очередь.Всякий раз, когда рабочий завершает задание, он будет блокировать get()
новую задачу из такого inqueue
.
OP: Пул выполняет только один поток вместо 4... Я пробовал разные способы, и он все еще делает это только один раз.
pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
... - это задание, выполняемое одним вызовом и выполняющее одну задачу.Если вам нужно более одного исполнения func
, вам нужно либо вызывать pool.apply_async()
несколько раз, либо вы используете метод пула отображения, такой как
pool.map(func, iterable, chunksize=None)
..., который отображает одну функциюнад итеративным.pool.apply_async
не блокирует, поэтому это "асинхронный".Он немедленно возвращает AsyncResult
-объект, который вы можете (блокирующе) вызвать .wait()
или .get()
после.
Из комментариев стало ясно, что вы хотите бесконечный и немедленные замены для завершенных задач (самодельный поток ввода) ... и программа должна остановиться на KeyboardInterrupt или когда результат не имеет определенного значения.
Вы можете использовать callback
-параметр apply_async
, чтобы планировать новые задачи, как только любые старых завершены.Сложность заключается в том, что делать с MainThread, чтобы предотвратить преждевременное завершение всего скрипта при сохранении его отзывчивости для KeyboardInterrupt.Разрешение сна MainThread в цикле позволяет ему немедленно реагировать на KeyboardInterrupt, предотвращая преждевременный выход.В случае, если результат должен остановить программу, вы можете позволить обратному вызову завершить пул.Затем MainThread просто должен включить проверку состояния пула в своем цикле сна.
import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool
def test_post(post_id):
time.sleep(randint(1, 3))
status_code = choice([200] * 9 + [404])
return "{} {} Message no.{}: {}".format(
datetime.now(), current_thread().name, post_id, status_code
), status_code
def handle_result(result):
msg, code = result
print(msg)
if code != 200:
print("terminating")
pool.terminate()
else:
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
if __name__ == '__main__':
N_WORKERS = 4
post_cnt = count()
pool = ThreadPool(N_WORKERS)
# initial distribution
for _ in range(N_WORKERS):
pool.apply_async(
test_post, args=(next(post_cnt),), callback=handle_result
)
try:
while pool._state == 0: # check if pool is still alive
time.sleep(1)
except KeyboardInterrupt:
print(" got interrupt")
Пример вывода с KeyboardInterrupt:
$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt
Пример Вывод с завершением из-за нежелательноговозвращаемое значение:
$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating
Обратите внимание, что в вашем сценарии вы также можете вызывать apply_async
чаще, чем N_WORKERS
, чтобы ваш первоначальный дистрибутив имел некоторый буфер для уменьшения задержки.