У меня есть фрейм данных длиной в несколько тысяч строк, который содержит две пары координат GPS в одном из столбцов, с помощью которых я пытаюсь рассчитать время в пути между этими координатами.У меня есть функция, которая принимает эти координаты и возвращает время в пути, и для расчета каждой записи может потребоваться 3-8 секунд.Таким образом, весь процесс может занять довольно много времени.Я хотел бы иметь возможность: используя 3-5 потоков, перебирать список, вычислять время в пути и переходить к следующей записи, пока другие потоки завершаются и не создают более 5 потоков впроцесс.Независимо, у меня все работает - я могу запускать несколько потоков, я могу отслеживать количество потоков и ждать, пока максимальное количество разрешенных потоков упадет ниже предела, до следующего запуска, и может перебирать кадр данных и вычислять время диска.Тем не менее, я испытываю затруднения, соединяя все это вместе.Вот отредактированная, уменьшенная версия того, что у меня есть.
import pandas
import threading
import arcgis
class MassFunction:
#This is intended to keep track of the active threads
MassFunction.threadCount = 0
def startThread(functionName,params=None):
#This kicks off a new thread and should count up to keep track of the threads
MassFunction.threadCount +=1
if params is None:
t = threading.Thread(target=functionName)
else:
t = threading.Thread(target=functionName,args=[params])
t.daemon = True
t.start()
class GeoAnalysis:
#This class handles the connection to the ArcGIS services
def __init__(self):
super(GeoAnalysis, self).__init__()
self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)
def drivetimeCalc(self, coordsString):
#The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
#This is the bottleneck of the process, as this calculation/response
#below takes a few seconds to get a response
points = coordsString.split(", ")
route_service_url = self.my_gis.properties.helperServices.route.url
self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
#This is intended to 'remove' one of the active threads
MassFunction.threadCount -=1
return travel_time
class MainFunction:
#This is to give access to the GeoAnalysis class from this class
GA = GeoAnalysis()
def closureDriveTimeCalc(self,coordsList):
#This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
while MassFunction.threadCount > 4:
pass
MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)
def driveTimeAnalysis(self,location):
#This reads a csv file containing a few thousand entries.
#Each entry/row contains gps coordinates, which need to be
#iterated over to calculate the drivetimes
locationMemberFile = pandas.read_csv(someFileName)
#The built-in apply() method in pandas seems to be the
#fastest way to iterate through the rows
locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)
Когда я запускаю это прямо сейчас, используя VS Code, я вижу, как число потоков увеличивается до тысяч в стеке вызовов, поэтому ячувствую, что он не ждет завершения потока и добавления / вычитания из значения threadCount.Будем очень благодарны за любые идеи / предложения / советы.
РЕДАКТИРОВАТЬ: По существу, моя проблема заключается в том, как вернуть значение travel_time, чтобы оно могло быть помещено в фрейм данных.В настоящее время у меня нет оператора возврата для функции closureDriveTimeCalc, поэтому, хотя функция работает правильно, она не отправляет информацию обратно в метод apply ().