Я использую spyder и python 2.7 для многопроцессорной обработки. У меня есть код, который уже работает, если итеративная длина мала (то есть пять элементов). Но когда итеративный размер большой (то есть 120 элементов), многопроцессорная карта python, кажется, неправильно обрабатывает некоторые процессы. Не существует определенного шаблона, и если я перезапущу код, то повторное обращение произойдет снова, но с другим шаблоном. Он ведет себя так, как будто он теряет повторяемость из памяти. Часть кода, который я использую, показана ниже. Неправильная обработка происходит в методе createShapefile ();
def testMethod(AREA_ID, PARENT_ID, routingPlanID, StationNameDic, message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod, StationName):
try:
message_testMethod="OK"
BranchIDL=[]
branchid=""
AREA_ID=AREA_ID.encode('utf-8')
print "Processing: ",StationName
if StationName in StationNameDic:
RouteInfo=StationNameDic[StationName]
else:
RouteInfo = ["ALL", "ALL", "PRESERVE_BOTH", ["OnewaySelective","Footway","Steps","Path","Tolls"], "OR", "Α,Β,Γ", "NGIS", "RA"]
print RouteInfo
i=0
for item in RouteInfo:
if i==0:
GroupIDs=item
elif i==1:
BranchID=item
if BranchID=="ALL":
cn = pyodbc.connect('DRIVER={SQL Server};SERVER=dataWARE4;DATABASE=ACS_ABP;trusted_connection=yes')
sql = "SELECT DISTINCT BRANCH_ID FROM abp_detail with(nolock,readuncommitted) WHERE STATION_ID='"+StationName+"'and Parent_ID in ("+PARENT_ID+") and Area_ID_1='"+AREA_ID+"'"
print sql
sql=sql.decode('utf-8')
cur = cn.cursor()
cur.execute(sql)
branchid = cur.fetchall()
cn.commit()
#print branchid
for row in branchid:
branch_id=row[0]
BranchIDL.append(branch_id)
else:
BranchIDL=BranchID.split(",")
elif i==2:
EndPoint=item
elif i==3:
RoadRules=item
elif i==4:
RouteType=item
elif i==5:
RouteArea=item
elif i==6:
ACSCoords=item
elif i==7:
RoutingMethod=item
i=i+1
if AREA_ID in RouteArea:
for branchid in BranchIDL:
branch_id=str(branchid)
noRouteFlag, pointGeometryList, RouteId, stationRoutes_Dic, SubRoutesList, totalStops, message_selectStopsNoGroup, routeDeliv =selectStopsNoGroup(ACSCoords, StationName, PARENT_ID, branch_id, AREA_ID, RouteType, GroupIDs, message_selectStopsNoGroup)
if message_selectStopsNoGroup !="selectStopsNoGroup_OK":
continue
if noRouteFlag==1:
continue
Store_shp, message_createStore=createStore(StationName, stationRoutes_Dic, branch_id, RoutingMethod)
if message_createStore!="createStore_OK":
continue
minBin, routeStops, SortedStationStops_shp, StationDeliveries_shp, message_createShapefile = createShapefile(routeDeliv, StationName, Store_shp, pointGeometryList, branch_id, stationRoutes_Dic, PARENT_ID, AREA_ID, RouteType, SubRoutesList, message_createShapefile,StationCode)
if message_createShapefile!="createShapefile_OK":
continue
if RoutingMethod=="VRP":
ExpStops_shp, groupRouteidsDic, RouteId, Route_shp, sortedExpStops_shp, Stops_shp, message_VRP = vrp(RoadRules, RouteType, StationName, Store_shp, SortedStationStops_shp, StationDeliveries_shp, minBin, RouteId, message_VRP)
else:
ExpStops_shp, groupRouteidsDic, RouteId, Route_shp, sortedExpStops_shp, Stops_shp, message_RA = ra(EndPoint, RoadRules, StationName, Store_shp, SortedStationStops_shp, StationDeliveries_shp, stationRoutes_Dic, branch_id, message_RA)
if message_RA!="ra_OK":
continue
if insABPDetail==1:
message_insABPDetail = insertABPDetail(StationName, sortedExpStops_shp, PARENT_ID, AREA_ID, message_insABPDetail)
if updRoutingMaster==1:
if RouteType=="CR":
message_insertRoutingMaster = insertRoutingMaster(StationName, branch_id, PARENT_ID, AREA_ID, Route_shp, RouteType, message_insertRoutingMaster)
elif RouteType=="OR":
message_updateRoutingMaster = updateRoutingMaster(RouteId, routingPlanID, message_updateRoutingMaster)
time_elapsed7 = (time.clock() - time_start7)
if updRoutingPaths==1:
message_updRoutingPaths = updateRoutingPathsAll(branch_id, groupRouteidsDic, RouteId, Route_shp, StationDeliveries_shp, StationName, Stops_shp, message_updRoutingPaths)
except Exception as e:
# If an error occurred, print line number and error message
import traceback, sys
tb = sys.exc_info()[2]
print "An error occured on line %i" % tb.tb_lineno
print str(e)
message_testMethod= "An error occured on line %i" % tb.tb_lineno+" "+str(e)
# return noRouteFlag, RouteId, stationRoutes_Dic, SubRoutesList, totalStops, message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod
return message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod
if __name__ == '__main__':
time_start = time.clock()
AREA_ID, PARENT_ID, routes_data, routingPlanID, StationName, stationCode_Dic = getRouteDetails(StationName, PARENT_ID, AREA_ID)
copyDailyABPDetail(PARENT_ID, AREA_ID)
AREA_ID=AREA_ID.decode('utf-8')
StationsList=[]
for rowStations in routes_data:
StationName=rowStations[0].encode('utf-8')
print "StationName", StationName
StationsList.append(StationName)
func = partial(testMethod, AREA_ID, PARENT_ID, routingPlanID, StationNameDic,message_selectStopsNoGroup, message_createShapefile, message_RA, message_VRP, message_insABPDetail, message_insertRoutingMaster, message_updateRoutingMaster, message_updRoutingPaths, message_testMethod)
cpuNum = multiprocessing.cpu_count()
print "cpuNum", cpuNum
# Create the pool object
pool = multiprocessing.Pool(processes=cpuNum)
# Fire off list to worker function.
# res is a list that is created with what ever the worker function is returning
results=pool.map(func,StationsList, chunksize=1)
pool.close()
pool.join()
print results
Методом проб и ошибок я выяснил, что если я установлю число процессоров на один меньше максимального и размер фрагмента будет равен 1, то я получу наименьшее количество ошибок. Моя главная проблема в том, что когда происходит неправильное обращение, программа никогда не заканчивается. Как я могу установить тайм-аут в методе createShapefile (), скажем, 2 минуты, чтобы в случае превышения убить работника и перейти к следующей итерации?