python ThreadPoolExecutor закрывает все потоки, когда я получаю результат - PullRequest
0 голосов
/ 01 августа 2020
• 1000 метод возвращает ответ и закрывает другие все еще активные потоки.

Пока мой код выглядит так (вероятно, наивно):

from concurrent.futures import ThreadPoolExecutor, as_completed
# class initiation etc

max_workers = cpu_count() * 5
urls = [url_to_open] * 50

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_url=[]
    for url in urls: # i had to do a loop to include sleep not to overload the proxy server
        future_to_url.append(executor.submit(self.get_with_random_proxy_using_chain,
                                     url,
                                     timeout,
                                     update_proxy_score,
                                     unwanted_keywords,
                                     unwanted_status_codes,
                                     random_universe_size,
                                     file_path_to_save_streamed_content))
        sleep(0.5)

    for future in as_completed(future_to_url):
            if future.result() is not None:
                return future.result()

Но он запускает все потоки.

Есть ли способ закрыть все потоки после завершения первого будущего. Я использую windows и python 3.7x

Пока я нашел эту ссылку , но мне не удается заставить ее работать (программа все еще работает в течение длительного времени) .

1 Ответ

1 голос
/ 02 августа 2020

Насколько мне известно, запущенные фьючерсы не могут быть отменены. Об этом написано довольно много. И есть даже некоторые обходные пути.

Но я бы посоветовал внимательнее взглянуть на модуль asyncio. Он вполне подходит для таких задач.

Ниже приведен простой пример, когда выполняется несколько одновременных запросов, и при получении первого результата остальные отменяются.

import asyncio
from typing import Set

from aiohttp import ClientSession


async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()


async def wait_for_first_response(tasks):
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for p in pending:
        p.cancel()
    return done.pop().result()


async def request_one_of(*urls):
    tasks = set()
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.create_task(fetch(url, session))
            tasks.add(task)

        return await wait_for_first_response(tasks)


async def main():
    response = await request_one_of("https://wikipedia.org", "https://apple.com")
    print(response)

asyncio.run(main())
...