Объединение PySpark и DBSCAN с pandas_udf - PullRequest
0 голосов
/ 07 февраля 2020

Я читал документацию по pandas_udf: Сгруппированная карта

И мне любопытно, как добавить в нее sklearn DBSCAN, например, у меня есть набор данных:

data = [(1, 11.6133, 48.1075),
         (1, 11.6142, 48.1066),
         (1, 11.6108, 48.1061),
         (1, 11.6207, 48.1192),
         (1, 11.6221, 48.1223),
         (1, 11.5969, 48.1276),
         (2, 11.5995, 48.1258),
         (2, 11.6127, 48.1066),
         (2, 11.6430, 48.1275),
         (2, 11.6368, 48.1278),
         (2, 11.5930, 48.1156)]

df = spark.createDataFrame(data, ["id", "X", "Y"])

И я хотел бы сгруппировать id и выполнить кластеризацию DBSCAN для каждого id отдельно.

@pandas_udf("id long, X double, Y double", PandasUDFType.GROUPED_MAP)
def dbscan_udf(...):
    # pdf is a pandas.DataFrame
    v = ...
    return ...

df.groupby("id").apply(dbscan_udf).show()

Результат, который я ищу, это оригинальный набор данных со столбцом cluster, который показывает точки рядом друг с другом id.

Спасибо за помощь!

Ответы [ 2 ]

0 голосов
/ 10 февраля 2020

Так что мне удалось сделать это самостоятельно:

from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
from sklearn.cluster import DBSCAN

data = [(1, 11.6133, 48.1075),
         (1, 11.6142, 48.1066),
         (1, 11.6108, 48.1061),
         (1, 11.6207, 48.1192),
         (1, 11.6221, 48.1223),
         (1, 11.5969, 48.1276),
         (2, 11.5995, 48.1258),
         (2, 11.6127, 48.1066),
         (2, 11.6430, 48.1275),
         (2, 11.6368, 48.1278),
         (2, 11.5930, 48.1156)]

df = spark.createDataFrame(data, ["id", "X", "Y"])

output_schema = StructType(
            [
                StructField('id', StringType()),
                StructField('X', DoubleType()),
                StructField('Y', DoubleType()),
                StructField('cluster', IntegerType())
             ]
    )

@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
def dbscan_pandas_udf(data):
    data["cluster"] = DBSCAN(eps=5, min_samples=3).fit_predict(data[["X", "Y"]])
    result = pd.DataFrame(data, columns=["id", "X", "Y", "cluster"])
    return result

data.groupby("id").apply(dbscan_udf).show()

Надеюсь, это может кому-то помочь в будущем.

0 голосов
/ 09 февраля 2020

Я считаю, что это сделано так.

# Sum
df.groupBy('id').sum().show()

Или, если ваша версия Spark старая, попробуйте это.

(df
    .groupBy("id")
    .agg(func.col("id"), func.sum("order_item"))
    .show())

См. Ссылку ниже для получения дополнительной информации, связанной с DBSCAN.

https://github.com/alitouka/spark_dbscan

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...