Я пытаюсь сделать что-то действительно простое, что каким-то образом превращается во что-то действительно сложное, когда задействован Pyspark.
У меня действительно большой фрейм данных (~ 2B строк) на нашей платформе, который мне запрещен скачать, но анализировать только с использованием кода Pyspark. Фрейм данных содержит положение некоторых объектов над Европой за последний год, и я хочу вычислить плотность этих объектов с течением времени. В прошлом я успешно использовал функцию numpy.histogram2d
с хорошими результатами (по крайней мере, быстрее, чем в numpy
). Поскольку в * 1005 нет эквивалента этой функции, я определил UDF для вычисления плотности и возврата нового кадра данных. Это работает, когда я обрабатываю только несколько строк (я пробовал с 100К строк):
import pandas as pd
import numpy as np
def compute_density(df):
lon_bins = np.linspace(-15, 45, 100)
lat_bins = np.linspace(35, 70, 100)
density, xedges, yedges = np.histogram2d(df["corrected_latitude_degree"].values,
df["corrected_longitude_degree"].values,
[lat_bins, lon_bins])
x2d, y2d = np.meshgrid(xedges[:-1], yedges[:-1])
x_out = x2d.ravel()
y_out = y2d.ravel()
density_out = density.ravel()
data = {
'latitude': x_out,
'longitude': y_out,
'density': density_out
}
return pd.DataFrame(data)
, который я затем называю
schema = StructType([
StructField("latitude", DoubleType()),
StructField("longitude", DoubleType()),
StructField("density", DoubleType())
])
preproc = (
inp
.limit(100000)
.withColumn("groups", F.lit(0))
)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_density_udf(df):
return compute_density(df)
result = preproc.groupby(["groups"]).apply(compute_density_udf)
Почему я использую GROUPED_MAP
версия для применения UDF? Мне не удалось заставить его работать с UDF типа SCALAR
при возвращении со схемой, хотя на самом деле мне не нужно группировать.
Когда я пытаюсь использовать этот UDF для полного набора данных, я получить OOM, потому что я считаю, что есть только одна группа и слишком много для UDF для обработки. Я уверен, что есть более разумный способ вычислить это напрямую с помощью pyspark
без UDF или альтернативно разделить на группы, а затем собрать результаты в конце? У кого-нибудь есть идеи / предложения?