Исполнитель ProcessPool, выполняющий ThreadPoolexecutor, не выполнен - PullRequest
0 голосов
/ 26 февраля 2020

Я работал над этим сценарием, используя ThreadPools и ProcessPools. Мой фокус - это параллельный сценарий, когда все задачи, которые мне нужно выполнить, чтобы завершить всю работу, будут выполняться asyn c, «огромным процессом». Позже к этому я добавил GUI, так что теперь родительский процесс - это процесс GUI, который вызывает «огромный процесс» при нажатии кнопки, используя многопроцессорный метод Process.

Проблема, с которой я столкнулся, заключается в том, что процесс grandchild поднимает threadPool для выполнения простого reports.head, не возвращает никакой информации изнутри ThreadPool и не зависает, не показывает никаких ошибок ... только кажется, что пропускает вызов и продолжает.

(Кстати, я использую многопроцессорную обработку. Manager.Lock () и многопоточность. Lock (), чтобы избежать условий гонки, которые я удалил из кода, который я публикую, потому что в моем реальном коде, с ними или без них, похоже, это не решает проблему, и по вопросам, которые я задаю » мы пытались сделать это проще)

что я получу:

$ python example.py
process_set
Subprocess start
Before process
https://www.google.com/
https://stackoverflow.com/
https://www.wikipedia.org/
https://httpstat.us/200
https://httpstat.us/404
https://httpstat.us/502
After process
['https://www.google.com/',200,'OK']
['https://stackoverflow.com/',200,'OK']
['https://www.wikipedia.org/',200,'OK']
['https://httpstat.us/200',200,'OK']
['https://httpstat.us/404',404, 'Not Found']
['https://httpstat.us/502',502,'Bad Gateway']
Subprocess finish
Info was generated!
process_reset

что я получу:

$ python example.py
process_set
Subprocess start
Before process
https://www.google.com/
https://stackoverflow.com/
https://www.wikipedia.org/
https://httpstat.us/200
https://httpstat.us/404
https://httpstat.us/502
Subprocess finish
Info was generated!
process_reset

я пробовал:

  • изменение файла request.head с помощью urllib.request.get для выполнения некоторого аналога в качестве цели для ThreadPool.

  • изменение функции назначения для некоторых проще (печать), и процесс работает нормально.

  • выполнить «url_check.py» и «огромный_процесс» ess.py 'само по себе, и они работали нормально.

    Это структура, которую я получил.

    +-- example/
    |   +-- __init__.py
    |   +-- example.py
    |   +-- huge_process.py
    |   +-- url_check/
        |   +-- __init__.py
        |   +-- url_check.py
    

example.py

#!/usr/bin/env python
from tkinter import Tk, BOTH
from tkinter.ttk import Frame, Button
from multiprocessing import Process
from huge_process import bigFunction

global root
global current_proc


current_proc = Process(target=bigFunction)#set process
print('process_set')

class Example(Frame):

    def __init__(self, master):
        super().__init__()
        self.initGUI()

    def initGUI(self):
        self.master.title('Process Example')
        self.pack(fill=BOTH, expand=1)
        #Button
        self.btn_Generate = Button(self, text = 'Generate', command = self.generateInfo)
        self.btn_Generate.pack()

    def generateInfo(self):
        global current_proc
        self.btn_Generate.config(text='Generating...')
        self.btn_Generate['state'] = 'disable'
        current_proc.start()
        self.blockFunction()

    def blockFunction(self):
        global current_proc
        if current_proc.is_alive():
            self.btn_Generate['state'] = 'disable'
        else:
            if self.btn_Generate.cget('text') == 'Generating...':
                self.btn_Generate.config(text='Generate')
                self.btn_Generate['state'] = 'normal'
                current_proc = Process(target=bigFunction)#reset process
                print('process_reset')
        self.master.after(100,self.blockFunction)


def main():
    root = Tk()
    app = Example(root)
    root.after(100,app.blockFunction)
    root.mainloop()

if __name__ == '__main__':
    main()

огромный_процесс.py

from traceback import format_exc
from concurrent.futures import ProcessPoolExecutor, wait
from url_check import url_check

def genInfo():
    print('Subprocess start')
    try:
        with ProcessPoolExecutor() as executor:
            futures=[]
            futures.append(executor.submit(url_check))
        #   futures.append(executor.submit( 'Diferent target'))

    except Exception as e:
        print(format_exc())

    print('Subprocess finish')


def bigFunction():
    #do some stuff
    genInfo()
    print('Info was generated!')
    #do more stuff

#bigFunction()

url_check.py

import concurrent.futures, requests
from threading import Lock
from http.client import responses
from requests.exceptions import Timeout,ConnectionError
from traceback import format_exc


global url_list
global results
global lock

lock = Lock()
url_list=[
'https://www.google.com/',
'https://stackoverflow.com/',
'https://www.wikipedia.org/',
'https://httpstat.us/200',
'https://httpstat.us/404',
'https://httpstat.us/502'
]


def ping_website(url_):
    try:
        response = requests.head(url_, timeout=5)
        print('url: {}......status_code: {} : {}'.format(url_, response.status_code, responses[response.status_code]))      
    except:
        print(format_exc())

    return [url_,response.status_code,responses[response.status_code]]

def genThreads(url_list):
    global results
    results = []
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers = 4) as executor:
            futures = {executor.submit(ping_website, url) for url in url_list}

        for future in concurrent.futures.as_completed(futures):
            r = future.result()
            results.append(r)
    except:
        print(format_exc())

    return results

def url_check():
    global url_list

    print('Before process')
    for url in url_list:
        print(url)
    try:
        url_list = genThreads(url_list)
    except:
        print(format_exc())
    print('After process')
    for url in url_list:
        print(url)

#url_check()

Спасибо за поддержку!

...