Эффективно рассчитать 2D гистограмму с помощью Pyspark (Numpy и UDF) - PullRequest
0 голосов
/ 31 марта 2020

Я пытаюсь сделать что-то действительно простое, что каким-то образом превращается во что-то действительно сложное, когда задействован Pyspark.

У меня действительно большой фрейм данных (~ 2B строк) на нашей платформе, который мне запрещен скачать, но анализировать только с использованием кода Pyspark. Фрейм данных содержит положение некоторых объектов над Европой за последний год, и я хочу вычислить плотность этих объектов с течением времени. В прошлом я успешно использовал функцию numpy.histogram2d с хорошими результатами (по крайней мере, быстрее, чем в numpy). Поскольку в * 1005 нет эквивалента этой функции, я определил UDF для вычисления плотности и возврата нового кадра данных. Это работает, когда я обрабатываю только несколько строк (я пробовал с 100К строк):

import pandas as pd
import numpy as np

def compute_density(df):
    lon_bins = np.linspace(-15, 45, 100)
    lat_bins = np.linspace(35, 70, 100)

    density, xedges, yedges = np.histogram2d(df["corrected_latitude_degree"].values,
                                             df["corrected_longitude_degree"].values,
                                             [lat_bins, lon_bins])
    x2d, y2d = np.meshgrid(xedges[:-1], yedges[:-1])
    x_out = x2d.ravel()
    y_out = y2d.ravel()
    density_out = density.ravel()
    data = {
            'latitude': x_out,
            'longitude': y_out,
            'density': density_out
            }
    return pd.DataFrame(data)

, который я затем называю

schema = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("density", DoubleType())
])

preproc = (
    inp
    .limit(100000)
    .withColumn("groups", F.lit(0))
)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_density_udf(df):
    return compute_density(df)

result = preproc.groupby(["groups"]).apply(compute_density_udf)

Почему я использую GROUPED_MAP версия для применения UDF? Мне не удалось заставить его работать с UDF типа SCALAR при возвращении со схемой, хотя на самом деле мне не нужно группировать.

Когда я пытаюсь использовать этот UDF для полного набора данных, я получить OOM, потому что я считаю, что есть только одна группа и слишком много для UDF для обработки. Я уверен, что есть более разумный способ вычислить это напрямую с помощью pyspark без UDF или альтернативно разделить на группы, а затем собрать результаты в конце? У кого-нибудь есть идеи / предложения?

...