Вот как я это сделал.
Сначала я создал фрейм данных.
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import StringType
data = [(0, -1.0), (1, 0.0), (2, 0.5), (3, 1.0), (4, 10.0),(5, 25.0),(6, 100.0),(7, 300.0),(8,float("nan"))]
df = spark.createDataFrame(data, ["id", "value"])
splits = [-float("inf"),0,0.001, 1, 5,10, 20, 30, 40, 50, 60, 70, 80, 90, 100, float("inf")]
# here I created a dictionary with {index: name of split}
splits_dict = {i:splits[i] for i in range(len(splits))}
Затем я создал пакетизатор как отдельную переменную.
# create bucketizer
bucketizer = Bucketizer(splits=splits, inputCol="value",outputCol="result")
# bucketed dataframe
bucketed = bucketizer.setHandleInvalid('skip').transform(df)
Чтобы получить метки, я просто применил функцию замены, используя диктовку, которую мы определили ранее.
bucketed = bucketed.replace(to_replace=splits_dict, subset=['result'])
bucketed.show()
выход:
+---+-----+---------+
| id|value| result|
+---+-----+---------+
| 0| -1.0|-Infinity|
| 1| 0.0| 0.0|
| 2| 0.5| 0.001|
| 3| 1.0| 1.0|
| 4| 10.0| 10.0|
| 5| 25.0| 20.0|
| 6|100.0| 100.0|
| 7|300.0| 100.0|
+---+-----+---------+