Как вы уже узнали, пул ждет ; print('done')
никогда не выполняется, потому что предположительно TypeError
поднимается раньше.
Пул непосредственно не ожидает завершения задач, он ожидает присоединения своих рабочих потоков, что неявно требует выполнения задач в одну сторону (успех) или другой (исключение).
Причина, по которой вы не видите возникновения исключений, заключается в том, что задача заключена в Future
. A Future
[...] инкапсулирует асинхронное выполнение вызываемого объекта.
Future
экземпляры возвращаются методом submit
исполнителя, и они позволяют запрашивать состояние выполнения и получать доступ к любому результату.
Это подводит меня к некоторым замечаниям, которые я хотел сделать.
Queue
in self.token_q
кажется ненужным
Судя по предоставленному вами коду, вы используете эту очередь только для передачи результатов ваших задач обратно в функцию tokenizer
. Это не нужно, вы можете получить к нему доступ с Future
, который возвращает вызов submit
:
def tokenizer(self):
all_tokens = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_tokens, num) for num in range(5)]
# executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
# https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623
print("Hi")
results = {}
for fut in futures:
try:
res = fut.result()
results[res[1]] = res[0]
except Exception:
continue
[...]
def get_tokens(self, thread_index):
[...]
# instead of self.token_q.put([new_tokens, thread_index])
return new_tokens, thread_index
Вполне вероятно, что ваша программа не выиграет от использования потоков
Из кода, которым вы поделились, кажется, что операции в get_tokens
связаны с процессором, а не с вводом / выводом Если вы запускаете свою программу на CPython (или любом другом интерпретаторе, использующем Global Interpreter Lock ), использование потоков в этом случае не принесет пользы.
В CPython глобальная блокировка интерпретатора или GIL является мьютексом, который защищает доступ к объектам Python, предотвращая одновременное выполнение байт-кодами Python несколькими потоками.
Это означает, что для любого процесса Python в любой момент времени может выполняться только один поток. Это не такая большая проблема, если ваша задача связана с вводом / выводом, то есть часто приостанавливает ожидание ввода / вывода (например, для данных в сокете). Если ваши задачи должны постоянно выполнять байт-код в процессоре, нет смысла останавливать один поток, чтобы другой мог выполнить некоторые инструкции. На самом деле результирующие переключения контекста могут даже оказаться вредными.
Возможно, вы захотите пойти на параллелизм вместо параллелизма . Для этого взгляните на ProcessPoolExecutor
.
Тем не менее, я рекомендую тестировать ваш код, работающий последовательно, параллельно и параллельно. Создание процессов или потоков обходится дорого, и, в зависимости от выполняемой задачи, это может занять больше времени, чем просто выполнение одной задачи за другой последовательным образом.
Кроме того, это выглядит немного подозрительно:
for index in range(len(self.zettels)):
for zettel in results[index]:
all_tokens.append(zettel)
results
, кажется, всегда имеет пять предметов, потому что for num in range(5)
. Если длина self.zettels
больше пяти, я бы ожидал, что здесь повысится KeyError
.
Если гарантировано, что self.zettels
будет иметь длину пять, я бы увидел потенциал для некоторой оптимизации кода здесь.