Объединяйте таблицы в PySpark при условии: если точка находится внутри многоугольника - PullRequest
1 голос
/ 06 марта 2020

У меня есть 2 фрейма данных PySpark: один с точками df_pnt, а другой с полигонами df_poly. Поскольку я не очень знаком с PySpark, я борюсь с правильным соединением этого фрейма данных при условии, находится ли точка внутри многоугольника. Я начал с этого кода, который я построил из материалов на этой странице :

from shapely import wkt  
import numpy as np
from shapely.geometry import Polygon, Point
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
from pyspark.sql.types import StringType

# Create simple data
polygon1 = Polygon([[0, 0], [.5, 0], [0.3, 0.2], [0, 0.2]])
polygon2 = Polygon([[0.6, 0], [0.6, 0.3], [0.6, 0.4], [0.7, 0.2]])
polygon3 = Polygon([[0.6, 0.5], [.5, 0.5], [0.3, 0.7], [0.4, 0.8]])
polygon4 = Polygon([[0, .5], [.2, 0.4], [0.5, 0.3], [0.5, 0.1]])

df = pd.DataFrame(data={'id':[0, 1, 2, 3],
                 'geometry':[polygon1, polygon2,   polygon3, polygon4]})
df_poly = gpd.GeoDataFrame(
    df, geometry=df['geometry']); del df


df = pd.DataFrame(data={'id':range(0,15),
                'geometry':[Point(pnt) for pnt in np.random.rand(15,2)]})
df_pnt = gpd.GeoDataFrame(
    df, geometry=df['geometry']); del df

# convert shape to str in pandas df
df_poly['wkt'] = pd.Series(
    map(lambda geom: str(geom.to_wkt()), df_poly['geometry']),
    index=df_poly.index, dtype='str')

df_pnt['wkt'] = pd.Series(
        map(lambda geom: str(geom.to_wkt()), df_pnt['geometry']),
        index=df_pnt.index, dtype='str')

# Now we create geometry column as string column in pyspark df
tmp = df_poly.drop("geometry", axis=1)
df_poly = spark.createDataFrame(tmp).cache(); del tmp

tmp = df_pnt.drop("geometry", axis=1)
df_pnt = spark.createDataFrame(tmp).cache(); del tmp

Если мы хотим построить первый полигон, мы должны запустить код

wkt.loads(df_poly.take(1)[0].wkt)

И если мы хотим проверить, содержит ли объект Polygon объект Point, нам нужна следующая строка

Polygon.contains(Point)

Вопрос в том, как обрабатывать это пользовательское условие во время процедуры соединения? df_poly намного меньше, чем точка df, поэтому я хочу также использовать вещание

UPD: Если бы мне нужно было реализовать это в geo pandas, это выглядело бы как это:

df_pnt
    id  geometry
0   0   POINT (0.08834 0.23203)
1   1   POINT (0.67457 0.19285)
2   2   POINT (0.71186 0.25128)
3   3   POINT (0.55621 0.35016)
4   4   POINT (0.79637 0.24668)
5   5   POINT (0.40932 0.37155)
6   6   POINT (0.36124 0.68229)
7   7   POINT (0.13476 0.58242)
8   8   POINT (0.41659 0.46298)
9   9   POINT (0.74878 0.78191)
10  10  POINT (0.82088 0.58064)
11  11  POINT (0.28797 0.24399)
12  12  POINT (0.40502 0.99233)
13  13  POINT (0.68928 0.73251)
14  14  POINT (0.37765 0.71518)

df_poly

        id  geometry
0   0   POLYGON ((0.00000 0.00000, 0.50000 0.00000, 0....
1   1   POLYGON ((0.60000 0.00000, 0.60000 0.30000, 0....
2   2   POLYGON ((0.60000 0.50000, 0.50000 0.50000, 0....
3   3   POLYGON ((0.00000 0.50000, 0.20000 0.40000, 0....

gpd.sjoin(df_pnt, df_poly, how="left", op='intersects')

    id_left     geometry    index_right     id_right
0   0   POINT (0.08834 0.23203)     NaN     NaN
1   1   POINT (0.67457 0.19285)     1.0     1.0
2   2   POINT (0.71186 0.25128)     NaN     NaN
3   3   POINT (0.55621 0.35016)     NaN     NaN
4   4   POINT (0.79637 0.24668)     NaN     NaN
5   5   POINT (0.40932 0.37155)     NaN     NaN
6   6   POINT (0.36124 0.68229)     2.0     2.0
7   7   POINT (0.13476 0.58242)     NaN     NaN
8   8   POINT (0.41659 0.46298)     NaN     NaN
9   9   POINT (0.74878 0.78191)     NaN     NaN
10  10  POINT (0.82088 0.58064)     NaN     NaN
11  11  POINT (0.28797 0.24399)     NaN     NaN
12  12  POINT (0.40502 0.99233)     NaN     NaN
13  13  POINT (0.68928 0.73251)     NaN     NaN
14  14  POINT (0.37765 0.71518)     2.0     2.0

data

...