Как можно распараллелить функцию «to_file» геопанды - PullRequest
1 голос
/ 04 апреля 2019

Я пытаюсь реализовать распараллеленную функцию для Geopandas, которая берет данные одного вектора (например, Shapefile, содержащий тип данных Multipolygon) и преобразует его в стандартную клеточную сетку с размерами ячеек x и y, определяемыми пользователем.

Поскольку эта функция может приводить к серьезным проблемам с памятью (т. Е. Вызванным слишком высоким пространственным разрешением), мне было интересно, возможно ли сохранить данные итеративно в указанном предназначенном файле. Таким образом, поскольку каждый параллельный процесс выполняет функцию «GRID», один и тот же процесс может итеративно сохранять данные в добавленном режиме. Таким образом, я считаю, что у человека не будет проблем с памятью.

Вот моя "SHP_to_GRID_Function". Обратите внимание, что приведенный ниже код все еще требует, чтобы все данные, сгенерированные многопроцессорной обработкой, обрабатывались непосредственно памятью (поэтому переполнение более чем определенно для больших наборов данных).

import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Polygon
from multiprocessing import Pool
import os
from functools import partial


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def parallelize_df(gdf, func, n_cores, dx=100, dy=100, verbose=False):

    Geometries= gdf.loc[:, 'geometry'].values

    pool = Pool(processes=n_cores)
    func_partial=partial(func, dx, dy, verbose) # prod_x has only one argument x (y is fixed to 10) 

    results = pool.map(func_partial, Geometries)

    pool.close()
    pool.join()

    print(np.shape(results))

    GRID = gpd.GeoSeries(np.array(results).ravel())

    print("GRID well created") 

    return GRID

def generate_grid_from_Poligon(dx=100, dy=100, verbose=False, polygon=None):
    if verbose == True:
        info('function parallelize_df')
    else:
        None

    xmin,ymin,xmax,ymax = polygon.bounds

    lenght = dx
    wide = dy

    cols = list(np.arange(int(np.floor(xmin)), int(np.ceil(xmax)), wide))
    rows = list(np.arange(int(np.floor(ymin)), int(np.ceil(ymax)), lenght))
    rows.reverse()

    subpolygons = []
    for x in cols:
        for y in rows:
            subpolygons.append( Polygon([(x,y), (x+wide, y), (x+wide, y-lenght), (x, y-lenght)]) )


    return subpolygons


def main(GDF, n_cores='standard', dx=100, dy=100, verbose= False):
    """
    GDF: geodataframe
    n_cores: use standard or a positive numerical (int) value. It will set the number of cores to use in the multiprocessing

    args: (dx: dimension in the x coordinate to make the grid
            dy: dimenion in the y coordinate to make the grid)

    """

    if isinstance(n_cores, str):
        import multiprocessing
        N_cores = multiprocessing.cpu_count() -1

    elif isinstance(n_cores, int):

        N_cores =n_cores


    GRID_GDF = parallelize_df(GDF, generate_grid_from_Poligon, n_cores=N_cores, dx=dx, dy=dy, verbose=verbose)

    return GRID_GDF

Благодарю вас за потраченное время,

С уважением,

Филипп Леал

1 Ответ

1 голос
/ 06 апреля 2019

Я наконец-то нашел решение для моего вопроса.Он не идеален, поскольку требует нескольких процессов записи и одного окончательного процесса объединения всех временных файлов, созданных во время выполнения.

Не стесняйтесь предлагать альтернативы.

Вот решение, которое я нашел.

import numpy as np
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon
from multiprocessing import Pool, Lock, freeze_support
import os
from functools import partial
import time

def info(time_value):

    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("Time spent: ", time.time() - time_value)

def init(l):

    global lock

    lock=l

def Data_Arranger(to_filename):

    """This function concatenates and deletes temporary files. It is an arranger 
        of the multicessing data results"
    """

    Base = os.path.join(os.path.dirname(to_filename), 'temp')


    Strings = [file for file in os.listdir(Base)]

    Strings = [os.path.join(Base, S) for S in Strings]

    if not os.path.exists(os.path.dirname(to_filename)):
        os.mkdir(os.path.dirname(to_filename))

    Sq = [S for S in Strings if S.endswith('.shp')]

    gpd.GeoDataFrame(pd.concat([gpd.read_file(sq1) for sq1 in Sq]), crs=GDF.crs).to_file(to_filename)

    for sq1 in Sq:
        os.remove(sq1) 

    import shutil

    shutil.rmtree(Base, ignore_errors=True) 




def parallelize_df(gdf, func, n_cores, dx=100, dy=100, verbose=False, to_filename=None):



    Geometries= gdf.loc[:, 'geometry'].values
    crs = gdf.crs

    pool = Pool(processes=n_cores, initializer=init, initargs=(Lock(), ) )

    func_partial=partial(func, dx, dy, verbose, to_filename, crs) # prod_x has only one argument x (y is fixed to 10) 


    pool.map(func_partial, Geometries)

    pool.close()
    pool.join()


def generate_grid_from_gdf(dx=100, dy=100, verbose=False, to_filename=None, crs=None, polygon=None):
    if verbose == True:
        info(time.time())
    else:
        None

    xmin,ymin,xmax,ymax = polygon.bounds

    lenght = dx
    wide = dy

    cols = list(np.arange(int(np.floor(xmin)), int(np.ceil(xmax)), wide))
    rows = list(np.arange(int(np.floor(ymin)), int(np.ceil(ymax)), lenght))
    rows.reverse()

    subpolygons = []
    for x in cols:
        for y in rows:
            subpolygons.append( Polygon([(x,y), (x+wide, y), (x+wide, y-lenght), (x, y-lenght)]) )



    lock.acquire()

    print('parent process: ', os.getppid(), ' has activated the Lock')
    GDF = gpd.GeoDataFrame(geometry=subpolygons, crs=crs)


    to_filename = os.path.join(os.path.dirname(to_filename), 'temp',  str(os.getpid()) + '_' + str(time.time()) + '.' + os.path.basename(to_filename).split('.')[-1])

    if not os.path.exists(os.path.dirname(to_filename)):
        os.mkdir(os.path.dirname(to_filename))

    try:
        print("to_filename: ", to_filename)
        GDF.to_file(to_filename)
    except:
        print("error in the file saving")
    lock.release()

    print('parent process: ', os.getppid(), ' has unlocked')




def main(GDF, n_cores='standard', dx=100, dy=100, verbose= False, to_filename=None):
    """
    GDF: geodataframe
    n_cores: use standard or a positive numerical (int) value. It will set the number of cores to use in the multiprocessing

    dx: dimension in the x coordinate to make the grid
    dy: dimenion in the y coordinate to make the grid)
    verbose: whether or not to show info from the processing. Appliable only if applying the function not
            in Windows (LINUX, UBUNTU, etc.), or when running in separte console in Windows.

    to_filename: the path which will be used to save the resultant file.
    """

    if isinstance(n_cores, str):
        import multiprocessing
        N_cores = multiprocessing.cpu_count() -1

    elif isinstance(n_cores, int):

        N_cores =n_cores



    parallelize_df(GDF, generate_grid_from_gdf, n_cores=N_cores, dx=dx, dy=dy, verbose=verbose, to_filename=to_filename)
    Data_Arranger(to_filename)

    ####################################################################################

if "__main__" == __name__:
    freeze_support()
    GDF = gpd.read_file("Someone's_file.shp")


    to_filename = "To_file_directory/To_file_name.shp"

    dx = 500 # resampling to 500 units. Ex: assuming the coordinate reference system is in meters, this function will return polygons of the given geometries in 500m for the longitudinal dimension.

    dy = 500 # same here. Assuming CRS is in meters units, the resultant file will be have polygons of 500m in latitudinal dimension

    main(GDF, dx=dx, dy=dy, verbose=True, to_filename=to_filename)

Благодарю вас за потраченное время.

Филипп Леал

...