Фреймы данных Pyspark остаются объединенными с условиями (пространственное объединение) - PullRequest
0 голосов
/ 19 марта 2020

Я использую pyspark и создал (из txt-файлов) два кадра данных

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
sc = spark.sparkContext
+---+--------------------+------------------+-------------------+
| id|                name|               lat|                lon|
+---+--------------------+------------------+-------------------+
|  1|.
.
.
+---+-------------------+------------------+-------------------+
| id|               name|               lat|                lon|
+---+-------------------+------------------+-------------------+
|  1||
.
.

Что я хочу, так это с помощью методов Spark получать каждую пару между элементами Фреймы данных, где их евклидово расстояние ниже определенного значения (скажем, «0,5»). Как:

record1, record2

или в любой другой форме, подобной этой, это не имеет значения.

Любая помощь будет оценена, спасибо.

1 Ответ

0 голосов
/ 19 марта 2020

Поскольку Spark не содержит каких-либо положений для геопространственных вычислений, вам нужна пользовательская функция, которая вычисляет геопространственное расстояние между двумя точками, например, с помощью формулы haversine (из здесь ):

from math import radians, cos, sin, asin, sqrt
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

@udf(returnType=FloatType())
def haversine(lat1, lon1, lat2, lon2):
    R = 6372.8

    dLat = radians(lat2 - lat1)
    dLon = radians(lon2 - lon1)
    lat1 = radians(lat1)
    lat2 = radians(lat2)

    a = sin(dLat/2)**2 + cos(lat1)*cos(lat2)*sin(dLon/2)**2
    c = 2*asin(sqrt(a))

    return R * c

Затем вы просто выполняете перекрестное соединение, обусловленное результатом вызова haversine():

df1.join(df2, haversine(df1.lat, df1.lon, df2.lat, df2.lon) < 100, 'cross') \
   .select(df1.name, df2.name)

Вам необходимо перекрестное соединение, так как Spark не может встроить Python UDF в соединение сам. Это дорого, но это то, что пользователям PySpark приходится жить.

Вот пример:

>>> df1.show()
+---------+-------------------+--------------------+
|      lat|                lon|                name|
+---------+-------------------+--------------------+
|37.776181|-122.41341399999999|AAE SSFF European...|
|38.959716|        -119.945595|Ambassador Motor ...|
| 37.66169|        -121.887367|Alameda County Fa...|
+---------+-------------------+--------------------+
>>> df2.show()
+------------------+-------------------+-------------------+
|               lat|                lon|               name|
+------------------+-------------------+-------------------+
|       34.19198813|-118.93756299999998|Daphnes Greek Cafe1|
|         37.755557|-122.25036084651899|Daphnes Greek Cafe2|
|38.423435999999995|         -121.41361|       Laguna Pizza|
+------------------+-------------------+-------------------+
>>> df1.join(df2, haversine(df1.lat, df1.lon, df2.lat, df2.lon) < 100, 'cross') \
       .select(df1.name.alias("name1"), df2.name.alias("name2")).show()
+--------------------+-------------------+
|               name1|              name2|
+--------------------+-------------------+
|AAE SSFF European...|Daphnes Greek Cafe2|
|Alameda County Fa...|Daphnes Greek Cafe2|
|Alameda County Fa...|       Laguna Pizza|
+--------------------+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...