Ошибка конвейера PySpark при использовании индексатора и кодера - PullRequest
0 голосов
/ 13 июня 2019

Я использую банковские данные из UCI , чтобы просто создать шаблон проекта.Я следовал руководству по PySpark на их сайте документации (извините, не могу найти ссылку больше).Я продолжаю получать сообщение об ошибке при запуске через конвейер.Я загрузил данные, преобразовал типы объектов и выполнил конвейеризацию для категориальных и числовых функций.Я хотел бы получить любые отзывы о любой части кода, но особенно там, где я получаю сообщение об ошибке, чтобы я мог продолжить эту сборку.Заранее спасибо!

Пример данных

+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| id|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  1| 59|    admin.|married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
|  2| 56|    admin.|married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
|  3| 41|technician|married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
|  4| 55|  services|married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
|  5| 54|    admin.|married| tertiary|     no|    184|     no|  no|unknown|  5|  may|     673|       2|   -1|       0| unknown|    yes|
+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
only showing top 5 rows
# Convert Feature Types
df.createOrReplaceTempView("df")

df2 = spark.sql("select \
                    cast(id as int) as id, \
                    cast(age as int) as age, \
                    cast(job as string) as job, \
                    cast(marital as string) as marital, \
                    cast(education as string) as education, \
                    cast(default as string) as default, \
                    cast(balance as int) as balance, \
                    cast(housing as string) as housing, \
                    cast(loan as string) as loan, \
                    cast(contact as string) as contact, \
                    cast(day as int) as day, \
                    cast(month as string) as month, \
                    cast(duration as int) as duration, \
                    cast(campaign as int) as campaign, \
                    cast(pdays as int) as pdays, \
                    cast(previous as int) as previous, \
                    cast(poutcome as string) as poutcome, \
                    cast(deposit as string) as deposit \
                from df")

# Data Types
df2.dtypes

[('id', 'int'),
 ('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]


 # Build Pipeline (Error is Here)
categorical_cols = ["job","marital","education","default","housing","loan","contact","month","poutcome"]
numeric_cols = ["age", "balance", "day", "duration", "campaign", "pdays","previous"]

stages = []

stringIndexer = StringIndexer(inputCol=[cols for cols in categorical_cols],
                              outputCol=[cols + "_index" for cols in categorical_cols])

encoder = OneHotEncoderEstimator(inputCols=[cols + "_index" for cols in categorical_cols],
                                 outputCols=[cols + "_classVec" for cols in categorical_cols])

stages += [stringIndexer, encoder]

label_string_id = StringIndexer(inputCol="deposit", outputCol="label")
stages += [label_string_id]

assembler_inputs = [cols + "_classVec" for cols in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

# Run Data Through Pipeline
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(df2)
prepped_df = pipeline_model.transform(df2)

Ошибка

"Ошибка типа: недопустимое значение параметра, заданное для параметра" inputCols ". Не удалось преобразовать job_index всписок строк "

1 Ответ

1 голос
/ 13 июня 2019

Это потому, что OneHotEncoderEstimator (в отличие от устаревшего OneHotEncoder) принимает несколько столбцов и приводит к нескольким столбцам (обратите внимание, что оба параметра множественные - Cols, а не Col). Таким образом, вы должны либо обернуть каждый вызов list,

for cols in categorical_cols:
    ...
    encoder = OneHotEncoderEstimator(
      inputCols=[cols + "_index"], outputCols=[cols + "_classVec"]
    )
    ...

или лучше пропускать все столбцы одновременно, вне цикла for:

encoder = OneHotEncoderEstimator(
    inputCols=[col + "_index" for cols in categorical_cols], 
    outputCols=[col + "_classVec" for for col in categorical_cols]
)
stages += [encoder]

Если вы сомневаетесь, каков ожидаемый ввод / вывод, вы всегда можете проверить соответствующий Param:

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer

OneHotEncoderEstimator.inputCols.typeConverter
## <function pyspark.ml.param.TypeConverters.toListString(value)>

StringIndexer.inputCol.typeConverter
## <function pyspark.ml.param.TypeConverters.toString(value)>

Как видите, для первого требуется объект, приводимый к списку строк, а для второго - просто строка.

...