Как использовать несколько, но ограниченное количество потоков в Python для обработки списка - PullRequest
0 голосов
/ 09 февраля 2019

У меня есть фрейм данных длиной в несколько тысяч строк, который содержит две пары координат GPS в одном из столбцов, с помощью которых я пытаюсь рассчитать время в пути между этими координатами.У меня есть функция, которая принимает эти координаты и возвращает время в пути, и для расчета каждой записи может потребоваться 3-8 секунд.Таким образом, весь процесс может занять довольно много времени.Я хотел бы иметь возможность: используя 3-5 потоков, перебирать список, вычислять время в пути и переходить к следующей записи, пока другие потоки завершаются и не создают более 5 потоков впроцесс.Независимо, у меня все работает - я могу запускать несколько потоков, я могу отслеживать количество потоков и ждать, пока максимальное количество разрешенных потоков упадет ниже предела, до следующего запуска, и может перебирать кадр данных и вычислять время диска.Тем не менее, я испытываю затруднения, соединяя все это вместе.Вот отредактированная, уменьшенная версия того, что у меня есть.

import pandas
import threading
import arcgis

class MassFunction:
    #This is intended to keep track of the active threads
    MassFunction.threadCount = 0

    def startThread(functionName,params=None):
        #This kicks off a new thread and should count up to keep track of the threads
        MassFunction.threadCount +=1

        if params is None:
            t = threading.Thread(target=functionName)
        else:
            t = threading.Thread(target=functionName,args=[params])
        t.daemon = True
        t.start()

class GeoAnalysis:
    #This class handles the connection to the ArcGIS services
    def __init__(self):
        super(GeoAnalysis, self).__init__()
        self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)

    def drivetimeCalc(self, coordsString):
        #The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
        #This is the bottleneck of the process, as this calculation/response
        #below takes a few seconds to get a response
        points = coordsString.split(", ")
        route_service_url = self.my_gis.properties.helperServices.route.url
        self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
        point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
        result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
        travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
        #This is intended to 'remove' one of the active threads 
        MassFunction.threadCount -=1
        return travel_time


class MainFunction:
    #This is to give access to the GeoAnalysis class from this class
    GA = GeoAnalysis()

    def closureDriveTimeCalc(self,coordsList):
        #This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
        while MassFunction.threadCount > 4:
            pass
        MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)

    def driveTimeAnalysis(self,location):
        #This reads a csv file containing a few thousand entries. 
        #Each entry/row contains gps coordinates, which need to be 
        #iterated over to calculate the drivetimes
        locationMemberFile = pandas.read_csv(someFileName)
        #The built-in apply() method in pandas seems to be the
        #fastest way to iterate through the rows

        locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)

Когда я запускаю это прямо сейчас, используя VS Code, я вижу, как число потоков увеличивается до тысяч в стеке вызовов, поэтому ячувствую, что он не ждет завершения потока и добавления / вычитания из значения threadCount.Будем очень благодарны за любые идеи / предложения / советы.

РЕДАКТИРОВАТЬ: По существу, моя проблема заключается в том, как вернуть значение travel_time, чтобы оно могло быть помещено в фрейм данных.В настоящее время у меня нет оператора возврата для функции closureDriveTimeCalc, поэтому, хотя функция работает правильно, она не отправляет информацию обратно в метод apply ().

1 Ответ

0 голосов
/ 09 февраля 2019

Вместо того, чтобы делать это в заявке, я бы использовал многопроцессорную систему Pool.map :

from multiprocessing import Pool

with Pool(processes=4) as pool:
    locationMemberFile['DRIVETIME'] = pool.map(self.closureDriveTimeCalc, locationMemberFile['COORDS_COL']))
...