Вектор объектов PySpark, допускающий значения NULL - PullRequest
0 голосов
/ 20 февраля 2019

Я хотел бы использовать классификатор в PySpark для набора данных, который содержит значения NULL.Значения NULL появляются в функциях, которые я создал, таких как Процент успеха.Мне нужно сохранить значение NULL, потому что я показал через панд, что сохранение значений NULL приводит к более сильной модели.Поэтому я не хочу вменять NULL с нулями или медианой.

Я понимаю, что Vector Assembler можно использовать для создания векторов объектов, но он не работает, когда данные содержат значения NULL.Мне было интересно, есть ли способ создать вектор объектов, который содержит значения NULL, который будет работать с LightGBMClassifier?

Я демонстрирую проблему, которая возникает у меня с данными diamonds.csv.Я использую чистую неотредактированную копию и копию, в которую я вставил нули, чтобы продемонстрировать мою проблему.

import pandas as pd
import numpy as np
import random

from mmlspark import LightGBMClassifier
from pyspark.sql.functions import * 
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator

diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:] 
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)

diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
 +-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+ 
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259| 
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407| 
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698| 
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433| 
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205| 
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059| 
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252| 
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583| 
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808| 
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|

Затем настраиваются этапы для использования в конвейере.

categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Конвейер вписывается в diamondsData_clean, и данные преобразуются, возвращая столбец метки и вектор объектов, как и ожидалось.

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
+-----+--------------------+ 
|price| features| 
+-----+--------------------+ 
| 326|(23,[0,5,12,17,18...| 
| 326|(23,[1,5,10,17,18...| 
| 327|(23,[3,5,13,17,18...| 
| 334|(23,[1,9,11,17,18...| 
| 335|(23,[3,12,17,18,1...| 
| 336|(23,[2,14,17,18,1...| 
+-----+--------------------+

Однако при попытке выполнить тот же шаг в кадре данных diamondsData_nulls возвращается ошибка.

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root 
|-- price: long (nullable = true) 
|-- features: vector (nullable = true) 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct&lt;cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double&gt;) =&gt; vector)

Это известная проблема, над которой работаем (https://github.com/Azure/mmlspark/issues/304), но в настоящее время я не могу найти средство настройки, позволяющее пропускать значения NULL.

Пытаясь использовать параметр handleInvalid = "keep"

Пользователь Machiel предложил параметр handleInvalid функций StringIndexer и OneHotEncoderEstimator - оказывается, его также следует применять в функции VectorAssembler. Я обновил свой код как таковой:

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
    stages += [stringIndexer, encoder]

numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]

1 Ответ

0 голосов
/ 23 августа 2019

Для строк и категориальных чисел Spark может создать корзину для пропущенных значений с помощью параметра handleInvalid:

OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')
...