PySpark: Как применить Python UDF к столбцам PySpark DataFrame? - PullRequest
0 голосов
/ 06 февраля 2020

У меня есть PySpark DataFrame с двумя наборами координат широты и долготы. Я пытаюсь вычислить расстояние Хаверсайна между каждым набором координат для данной строки. Я использую следующую haversine (), которую я нашел в Интернете. Проблема в том, что его нельзя применить к столбцам, или, по крайней мере, я не знаю синтаксис для этого. Может кто-нибудь поделиться синтаксисом или указать лучшее решение?

from math import radians, cos, sin, asin, sqrt
def haversine(lat1, lon1, lat2, lon2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    # Radius of earth in miles is 3,963; 5280 ft in 1 mile
    ft = 3963 * 5280 * c
    return ft

Я знаю, что вышеописанная функция haversine () работает, потому что я протестировал ее, используя некоторые координаты широты / долготы из моего фрейма данных, и получил разумные результаты:

haversine(-85.8059, 38.250134, 
          -85.805122, 38.250098)
284.1302325439314

Когда я заменяю примерные координаты именами столбцов, соответствующими широтам / долготам в моем фрейме данных PySpark, я получаю ошибку. Я пробовал следующий код в попытке создать новый столбец, содержащий рассчитанное расстояние Хаверсайна, измеренное в футах:

df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist', 
                           haversine(df['p1_latitude'],
                                    df['p1_longitude'],
                                    df['p2_lat'],
                                    df['p2_lon']))
.show()

, но я получаю ошибку:

must be real number, not Column
Traceback (most recent call last):
  File "<stdin>", line 8, in haversine
TypeError: must be real number, not Column

Это указывает для меня это то, что я должен каким-то образом итеративно применять свою функцию haversine к каждой строке моего PySpark DataFrame, но я не уверен, правильно ли это предположение, и даже если это так, я не знаю, как это сделать. Кроме того, мои lat / lons относятся к типу float.

1 Ответ

1 голос
/ 06 февраля 2020

Не используйте UDF, когда вы можете использовать встроенные функции Spark, так как они обычно менее производительны.

Вот решение, использующее только функции Spark SQL, которые выполняют те же функции, что и ваша функция:

from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos

df.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \
  .withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \
  .withColumn("haversine_dist", asin(sqrt(
                                         sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))
                                         * cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280) \
  .drop("dlon", "dlat")\
  .show(truncate=False)

Дает:

+-----------+------------+----------+---------+------------------+
|p1_latitude|p1_longitude|p2_lat    |p2_lon   |haversine_dist    |
+-----------+------------+----------+---------+------------------+
|-85.8059   |38.250134   |-85.805122|38.250098|284.13023254857814|
+-----------+------------+----------+---------+------------------+

Вы можете найти доступную Spark встроенные функции здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...