Как убить висящих рабочих в многопроцессорной среде - PullRequest
0 голосов
/ 13 марта 2019

Я использую spyder и python 2.7 для многопроцессорной обработки. У меня есть код, который уже работает, если итеративная длина мала (то есть пять элементов). Но когда итеративный размер большой (то есть 120 элементов), многопроцессорная карта python, кажется, неправильно обрабатывает некоторые процессы. Не существует определенного шаблона, и если я перезапущу код, то повторное обращение произойдет снова, но с другим шаблоном. Он ведет себя так, как будто он теряет повторяемость из памяти. Часть кода, который я использую, показана ниже. Неправильная обработка происходит в методе createShapefile ();

def testMethod(AREA_ID, PARENT_ID, routingPlanID, StationNameDic, message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod, StationName):
    try:
        message_testMethod="OK"
        BranchIDL=[]
        branchid=""
        AREA_ID=AREA_ID.encode('utf-8')
        print "Processing: ",StationName  

        if StationName in StationNameDic:
            RouteInfo=StationNameDic[StationName]
        else:
            RouteInfo = ["ALL", "ALL", "PRESERVE_BOTH", ["OnewaySelective","Footway","Steps","Path","Tolls"], "OR", "Α,Β,Γ", "NGIS", "RA"]
        print RouteInfo
        i=0
        for item in RouteInfo:
            if i==0:
                GroupIDs=item
            elif i==1:
                BranchID=item
                if BranchID=="ALL":    
                    cn = pyodbc.connect('DRIVER={SQL Server};SERVER=dataWARE4;DATABASE=ACS_ABP;trusted_connection=yes')
                    sql = "SELECT DISTINCT BRANCH_ID FROM abp_detail with(nolock,readuncommitted) WHERE STATION_ID='"+StationName+"'and Parent_ID in ("+PARENT_ID+") and Area_ID_1='"+AREA_ID+"'"
                    print sql
                    sql=sql.decode('utf-8')
                    cur = cn.cursor()
                    cur.execute(sql)
                    branchid = cur.fetchall()
                    cn.commit()
                    #print branchid
                    for row in branchid:
                        branch_id=row[0]
                        BranchIDL.append(branch_id)
                else:
                    BranchIDL=BranchID.split(",")                                            
            elif i==2:
                EndPoint=item
            elif i==3:
                RoadRules=item
            elif i==4:
                RouteType=item
            elif i==5:
                RouteArea=item
            elif i==6:
                ACSCoords=item
            elif i==7:
                RoutingMethod=item
            i=i+1


        if AREA_ID in RouteArea:
            for branchid in BranchIDL:
                branch_id=str(branchid)
                noRouteFlag, pointGeometryList, RouteId, stationRoutes_Dic, SubRoutesList, totalStops, message_selectStopsNoGroup, routeDeliv =selectStopsNoGroup(ACSCoords, StationName, PARENT_ID, branch_id, AREA_ID, RouteType, GroupIDs, message_selectStopsNoGroup)
                if message_selectStopsNoGroup !="selectStopsNoGroup_OK":
                    continue
                if noRouteFlag==1:
                    continue
                Store_shp, message_createStore=createStore(StationName, stationRoutes_Dic, branch_id, RoutingMethod)
                if message_createStore!="createStore_OK":
                    continue
                minBin, routeStops, SortedStationStops_shp, StationDeliveries_shp, message_createShapefile = createShapefile(routeDeliv, StationName, Store_shp, pointGeometryList, branch_id, stationRoutes_Dic, PARENT_ID, AREA_ID, RouteType, SubRoutesList, message_createShapefile,StationCode)
                if message_createShapefile!="createShapefile_OK":
                    continue              
                if RoutingMethod=="VRP":
                    ExpStops_shp, groupRouteidsDic, RouteId, Route_shp, sortedExpStops_shp, Stops_shp, message_VRP = vrp(RoadRules, RouteType, StationName, Store_shp, SortedStationStops_shp, StationDeliveries_shp, minBin, RouteId, message_VRP)
                else:
                    ExpStops_shp, groupRouteidsDic, RouteId, Route_shp, sortedExpStops_shp, Stops_shp, message_RA  = ra(EndPoint, RoadRules, StationName, Store_shp, SortedStationStops_shp, StationDeliveries_shp, stationRoutes_Dic, branch_id, message_RA)
                    if message_RA!="ra_OK":
                        continue
                if  insABPDetail==1:
                    message_insABPDetail = insertABPDetail(StationName, sortedExpStops_shp, PARENT_ID, AREA_ID, message_insABPDetail)
                if updRoutingMaster==1:
                    if RouteType=="CR":
                        message_insertRoutingMaster = insertRoutingMaster(StationName, branch_id, PARENT_ID, AREA_ID, Route_shp, RouteType, message_insertRoutingMaster)
                    elif RouteType=="OR":
                        message_updateRoutingMaster = updateRoutingMaster(RouteId, routingPlanID, message_updateRoutingMaster)
                        time_elapsed7 = (time.clock() - time_start7)
                if updRoutingPaths==1:
                    message_updRoutingPaths = updateRoutingPathsAll(branch_id, groupRouteidsDic, RouteId, Route_shp, StationDeliveries_shp, StationName, Stops_shp, message_updRoutingPaths)

    except Exception as e:
        # If an error occurred, print line number and error message
        import traceback, sys
        tb = sys.exc_info()[2]
        print "An error occured on line %i" % tb.tb_lineno
        print str(e)
        message_testMethod= "An error occured on line %i" % tb.tb_lineno+" "+str(e)

   # return noRouteFlag, RouteId, stationRoutes_Dic, SubRoutesList, totalStops, message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod
    return message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod


if __name__ == '__main__':
    time_start = time.clock()      
    AREA_ID, PARENT_ID, routes_data, routingPlanID, StationName, stationCode_Dic = getRouteDetails(StationName, PARENT_ID, AREA_ID)
    copyDailyABPDetail(PARENT_ID, AREA_ID)
    AREA_ID=AREA_ID.decode('utf-8')
    StationsList=[]
    for rowStations in routes_data:
        StationName=rowStations[0].encode('utf-8')
        print "StationName", StationName
        StationsList.append(StationName) 
    func = partial(testMethod, AREA_ID, PARENT_ID, routingPlanID, StationNameDic,message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod)

    cpuNum = multiprocessing.cpu_count()
    print "cpuNum", cpuNum 
    # Create the pool object  
    pool = multiprocessing.Pool(processes=cpuNum) 

    # Fire off list to worker function.  
    # res is a list that is created with what ever the worker function is returning  
    results=pool.map(func,StationsList, chunksize=1)        
    pool.close()  
    pool.join()  
    print results

Методом проб и ошибок я выяснил, что если я установлю число процессоров на один меньше максимального и размер фрагмента будет равен 1, то я получу наименьшее количество ошибок. Моя главная проблема в том, что когда происходит неправильное обращение, программа никогда не заканчивается. Как я могу установить тайм-аут в методе createShapefile (), скажем, 2 минуты, чтобы в случае превышения убить работника и перейти к следующей итерации?

...