Как заставить многопроцессорный скрипт работать с Linux на Windows? - PullRequest
0 голосов
/ 27 марта 2019

Проблема в том, что скрипт работает правильно в Linux без ошибок.Я провел некоторое исследование и обнаружил, что это проблема в Windows, когда создаются дочерние процессы, но некоторые люди говорили, что есть способ заставить это работать

Я пытался использовать multiprocessing.set_start_method с каждой комбинацией форкаспавн и форксервер.Ни один из них не сработал.

import os, zipfile, sys
import multiprocessing
from tqdm import *
import time
import argparse
import logging

def time_conversion(progess_bar):
    tmp_string=str(progess_bar)
    poz=tmp_string.find("[")
    a=int(tmp_string[poz+1]+tmp_string[poz+2])*60+int(tmp_string[poz+4]+tmp_string[poz+5])
    return a
def myFunc(no_proc, queue,no_files,progess_bar):
    # args meaning
    # no_proc = number of the process [0,process_limit-1]
    # queue = the queue for every process containing all the zip files soon to be unpacked
    # no_files[no_proc] = the total number of all the files from every zip file from queue for the no_proc process
    # progess_bar[no_proc] = the variable containing the progress bar for the no_proc process
    last_elapsed=0
    elapsed=0
    extracted=0
    total_archives=queue.qsize()
    while(not queue.empty()):
        item = queue.get()
        zip_current = zipfile.ZipFile(item)
        for i in zip_current.namelist():
            progess_bar.set_postfix(zipName=item, unpacking="/.." + i[-10:],
                              zipSize=str(os.path.getsize(dir_name + "/" + item) // 1024) + "KB",
                              zipsDone=(str(extracted) + "/" + str(total_archives)), refresh=True)
            zip_current.extract(i, dir_name)
            elapsed=time_conversion(progess_bar)#elapsed time in seconds
            if elapsed-last_elapsed>=ttimeout and ttimeout>-1:
                logging.error(f"The time limit for a single zipfile of {ttimeout} seconds has been reached, the program stopped")
                break
            progess_bar.update(1)
            progess_bar.refresh()
        logging.info(f"Archive {item} has been unpacked successfully in {elapsed-last_elapsed} seconds")
        zip_current.close()
        extracted+=1
        progess_bar.set_postfix(zipName=item, unpacking="/.." + i[-10:],
                                zipSize=str(os.path.getsize(dir_name + "/" + item) // 1024) + "KB",
                                zipsDone=(str(extracted) + "/" + str(total_archives)), refresh=True)
        last_elapsed=elapsed
if __name__ == '__main__':

    multiprocessing.set_start_method('spawn')
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", type=int,default=1,help="The number of total processes. Default value is 1")
    parser.add_argument("-log",type=str,default="log.txt",help="The location of the output logfile. Default if log.txt")
    parser.add_argument("-ttimeout",type=int,default=-1,help="The maximum ammount of seconds executed on a single zip file. Default is unlimited")
    parser.add_argument("-gtimeout",type=int,default=-1,help="The maximum ammount of seconds executed in total. Default is unlimited")
    parser.add_argument("dir",type=str,help="The directory in which zip files are located")
    args=parser.parse_args()
    logging.basicConfig(filename=args.log,level=logging.INFO,
                        format="%(levelname)s at %(asctime)s -> %(message)s")
    process_limit = args.p
    dir_name=args.dir
    logfile=args.log
    ttimeout=args.ttimeout
    gtimeout=args.gtimeout
    extension = ".zip"
    logging.info(f"The program has the following values:\n{dir_name} is the directory, {process_limit} processes, {'unlimited' if ttimeout==-1 else ttimeout } seconds for timeout/zipfile, {'unlimited' if gtimeout==-1 else gtimeout} seconds for global running,")
    dir_name="E:\\zips"
    os.chdir(dir_name)
    list_processes = []
    i = 0
    list = os.listdir(dir_name)
    files = []
    for i in range(0, len(list)):
        if (list[i].endswith(extension)):
            files.append(list[i])
    if process_limit>len(files):
        process_limit=len(files)
    queue = [multiprocessing.Queue() for i in range(process_limit)]
    no_files=[0]*process_limit
    for i in range(len(files)):
        queue[i%process_limit].put(files[i])
        zip_ref = zipfile.ZipFile(files[i])
        no_files[i%process_limit]+=len(zip_ref.namelist())
    progess_bar=[]
    for i in range(0,process_limit):
        progess_bar.append(tqdm(total=no_files[i],ncols=150,position=i+1))

    for no_proc in range(0, process_limit):
            single_process = multiprocessing.Process(target=myFunc, args=(no_proc, queue[no_proc], no_files[no_proc],progess_bar[no_proc]))
            list_processes.append(single_process)
            single_process.start() ##### The problem is here, i guess
    for single_process in list_processes:
        if gtimeout>-1:
            single_process.join(gtimeout)
            if single_process.is_alive():
                logging.error(f"The global timeout of {gtimeout} seconds has been reached, killing all processesses")
                for single_process in list_processes:
                    single_process.terminate()
                logging.error("The processes have been killed succesfully")
                break
        else:
            process.join()
    print("\n"*process_limit)

Когда я пытаюсь запустить его в CMD, я получаю эту ошибку, и программа никогда не завершается, я должен остановить ее вручную.

Traceback (most recent call last):
  File "test5.py", line 85, in <module>
    single_process.start()
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object

C:\Users\ciuzb\Desktop>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\Users\ciuzb\AppData\Local\Programs\Python\Python37-32\lib\multiprocessing\reduction.py", line 82, in steal_handle
    _winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect

Можеткто-то, пожалуйста, объясните, почему я получаю эту ошибку и, возможно, решение?

...