Сбой программы Pyspark в теле функции - PullRequest
0 голосов
/ 02 мая 2018

Я новичок в реализации PySpark алгоритмов ML. Я пытаюсь создать модель классификации, которая может быть любой из Логистики, Случайного леса, Мультиклассового классификатора и т. Д. Я написал код с RandomForestClassifier на данный момент.

Проблема: шаги предварительной обработки данных StandardScaler запускается при построчном запуске, но при попытке запуска при вызове функции происходит сбой. Пример кода и описание ошибки:

def preprocessData(data='', all_cols='', categoricalcols=''):
    #Preprocessing the customer data
    print("Preparing the data for model fitting.")
    #Assembling the Dataframe
    print("Assembling the data.")
    assembler = VectorAssembler(inputCols=all_cols, outputCol="assmbldFeatures")

    #Standardize the data
    print("Standardizing and Scaling the data.")
    stdScaler = StandardScaler(inputCol=assembler.getOutputCol(),outputCol="stdScldFeatures",withStd=True, withMean=False)

    #Min Max scaler
    print("Performing min max scaling.")
    minMaxScaler = MinMaxScaler(inputCol=stdScaler.getOutputCol(), outputCol="minMaxScldFeatures")

    pipeline = Pipeline(stages=[assembler,stdScaler,minMaxScaler])
    prcsdData= pipeline.fit(data).transform(data)
    return prcsdData

Даже при использовании конвейера и последовательности шагов происходит сбой. Добрый совет. Я в основном пытаюсь модульный код.

error:
Performing min max scaling.
[Stage 0:==============================================>          (13 + 2) / 16]18/05/02 05:13:42 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 16, lx955.mutualofomaha.com, executor 2): java.lang.IllegalArgumentException: requirement failed: Vector should have dimension larger than zero.
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:74)
        at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:67)
        at org.apache.spark.mllib.feature.StandardScaler$$anonfun$2.apply(StandardScaler.scala:58)
        at org.apache.spark.mllib.feature.StandardScaler$$anonfun$2.apply(StandardScaler.scala:58)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)     

<- Пример данных ->

|incm|incm_2|med|age
|56  |  181 |18 |47
|46  |  198 |31 |54
|91  |  742 |54 |45
|54  |  187 |55 |57
|58  |  123 |61 |48
|50  |  196 |49 |44
|32  |  145 |39 |55
|30  |  101 |29 |55
|62  |  177 |44 |53
|53  |  140 |30 |57
...