Невозможно передать StringIndexer как список на этап конвейера модели - PullRequest
0 голосов
/ 19 июня 2020

Конвейер PySpark является для меня новым. Я пытаюсь создать этапы конвейера, передав следующий список:

pipeline = Pipeline().setStages([indexer,assembler,dtc_model])

где я применяю индексацию функций для нескольких столбцов:

cat_col = ['Gender','Habit','Mode']

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]

При выполнении подгонки на конвейере появляется ошибка ниже:

model_pipeline = pipeline.fit(train_df)

Как мы можем передать список на сцену или какую-либо работу, чтобы достичь этого или лучшего способа сделать это?

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<command-3999694668013877> in <module>
----> 1 model_pipeline = pipeline.fit(train_df)

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
     95             if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)):
     96                 raise TypeError(
---> 97                     "Cannot recognize a pipeline stage of type %s." % type(stage))
     98         indexOfLastEstimator = -1
     99         for i, stage in enumerate(stages):

TypeError: Cannot recognize a pipeline stage of type <class 'list'>.```

1 Ответ

1 голос
/ 20 июня 2020

Попробуйте ниже -

cat_col = ['Gender','Habit','Mode']
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(training_data_0) for column in cat_col ]

assembler = VectorAssembler...
dtc_model = DecisionTreeClassifier...

# Create pipeline using transformers and estimators
stages = indexer
stages.append(assembler)
stages.append(dtc_model)
pipeline = Pipeline().setStages(stages)

model_pipeline = pipeline.fit(train_df)
...