с concurrent.futures.ThreadPoolExecutor () в качестве исполнителя: ... не ждет - PullRequest
0 голосов
/ 04 июля 2019

Я пытаюсь использовать ThreadPoolExecutor() в методе класса для создания пула потоков, которые будут выполнять другой метод в том же классе. У меня есть with concurrent.futures.ThreadPoolExecutor()..., однако он не ждет, и выдается сообщение о том, что в словаре, к которому я обращаюсь после оператора «with ...», не было ключа. Я понимаю, почему выдается ошибка, потому что словарь еще не обновлен, потому что потоки в пуле не завершили выполнение. Я знаю, что потоки не завершились, потому что у меня есть печать ("done") в методе, который вызывается в ThreadPoolExecutor, и "done" не выводится на консоль.

Я новичок в темах, поэтому, если какие-либо предложения о том, как сделать это лучше, приветствуются!

    def tokenizer(self):
        all_tokens = []
        self.token_q = Queue()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for num in range(5):
                executor.submit(self.get_tokens, num)
            executor.shutdown(wait=True)

        print("Hi")
        results = {}
        while not self.token_q.empty():
            temp_result = self.token_q.get()
            results[temp_result[1]] = temp_result[0]
            print(temp_result[1])
        for index in range(len(self.zettels)):
            for zettel in results[index]:
                all_tokens.append(zettel)
        return all_tokens

    def get_tokens(self, thread_index):
        print("!!!!!!!")
        switch = {
            0: self.zettels[:(len(self.zettels)/5)],
            1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
            2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
            3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
            4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
        }
        new_tokens = []
        for zettel in switch.get(thread_index):
            tokens = re.split('\W+', str(zettel))
            tokens = list(filter(None, tokens))
            new_tokens.append(tokens)
        print("done")
        self.token_q.put([new_tokens, thread_index])

'' '

Ожидается увидеть все операторы print("!!!!!!") и print("done") до оператора print ("Hi"). На самом деле показывает !!!!!!!, затем Hi, затем KeyError для словаря результатов.

Ответы [ 2 ]

0 голосов
/ 04 июля 2019

Как вы уже узнали, пул ждет ; print('done') никогда не выполняется, потому что предположительно TypeError поднимается раньше.
Пул непосредственно не ожидает завершения задач, он ожидает присоединения своих рабочих потоков, что неявно требует выполнения задач в одну сторону (успех) или другой (исключение).

Причина, по которой вы не видите возникновения исключений, заключается в том, что задача заключена в Future. A Future

[...] инкапсулирует асинхронное выполнение вызываемого объекта.

Future экземпляры возвращаются методом submit исполнителя, и они позволяют запрашивать состояние выполнения и получать доступ к любому результату.

Это подводит меня к некоторым замечаниям, которые я хотел сделать.

Queue in self.token_q кажется ненужным
Судя по предоставленному вами коду, вы используете эту очередь только для передачи результатов ваших задач обратно в функцию tokenizer. Это не нужно, вы можете получить к нему доступ с Future, который возвращает вызов submit:

def tokenizer(self):
    all_tokens = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(get_tokens, num) for num in range(5)]
        # executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
        # https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623

    print("Hi")
    results = {}
    for fut in futures:
        try:
            res = fut.result()
            results[res[1]] = res[0]
        except Exception:
            continue
    [...] 

def get_tokens(self, thread_index):
    [...]
    # instead of self.token_q.put([new_tokens, thread_index])
    return new_tokens, thread_index

Вполне вероятно, что ваша программа не выиграет от использования потоков
Из кода, которым вы поделились, кажется, что операции в get_tokens связаны с процессором, а не с вводом / выводом Если вы запускаете свою программу на CPython (или любом другом интерпретаторе, использующем Global Interpreter Lock ), использование потоков в этом случае не принесет пользы.

В CPython глобальная блокировка интерпретатора или GIL является мьютексом, который защищает доступ к объектам Python, предотвращая одновременное выполнение байт-кодами Python несколькими потоками.

Это означает, что для любого процесса Python в любой момент времени может выполняться только один поток. Это не такая большая проблема, если ваша задача связана с вводом / выводом, то есть часто приостанавливает ожидание ввода / вывода (например, для данных в сокете). Если ваши задачи должны постоянно выполнять байт-код в процессоре, нет смысла останавливать один поток, чтобы другой мог выполнить некоторые инструкции. На самом деле результирующие переключения контекста могут даже оказаться вредными.
Возможно, вы захотите пойти на параллелизм вместо параллелизма . Для этого взгляните на ProcessPoolExecutor.
Тем не менее, я рекомендую тестировать ваш код, работающий последовательно, параллельно и параллельно. Создание процессов или потоков обходится дорого, и, в зависимости от выполняемой задачи, это может занять больше времени, чем просто выполнение одной задачи за другой последовательным образом.


Кроме того, это выглядит немного подозрительно:

for index in range(len(self.zettels)):
    for zettel in results[index]:
        all_tokens.append(zettel)

results, кажется, всегда имеет пять предметов, потому что for num in range(5). Если длина self.zettels больше пяти, я бы ожидал, что здесь повысится KeyError.
Если гарантировано, что self.zettels будет иметь длину пять, я бы увидел потенциал для некоторой оптимизации кода здесь.

0 голосов
/ 04 июля 2019

Вам необходимо выполнить цикл по concurrent.futures.as_completed (), как показано здесь . Он выдаст значения по завершении каждого потока.

...