Это не совсем то, как предполагается использовать Spark ML Pipelines. В общем, все преобразования, необходимые для преобразования входного набора данных в формат, подходящий для Pipeline
, должны применяться заранее, и только общие компоненты должны быть встроены как stages
.
При использовании нативного (Scala) API технически возможно, в таких простых случаях, как этот, использовать пустой SQLTransformer
:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.SQLTransformer
import org.apache.spark.ml.param.ParamPair
val df = spark.range(1, 4).toDF("a")
val sqlTransformer = new SQLTransformer()
val pipeline = new Pipeline().setStages(Array(sqlTransformer))
и поставка statement
Param
для обоих fit
val model = pipeline.fit(
df,
ParamPair(sqlTransformer.statement, "SELECT *, 4 AS `test` FROM __THIS__")
)
model.transform(df).show
+---+----+
| a|test|
+---+----+
| 1| 4|
| 2| 4|
| 3| 4|
+---+----+
и transform
:
model.transform(
df,
ParamPair(sqlTransformer.statement, "SELECT *, 5 AS `test` FROM __THIS__")
).show
+---+----+
| a|test|
+---+----+
| 1| 5|
| 2| 5|
| 3| 5|
+---+----+
, но ни ml_fit
, ни ml_transform
/ ml_predict
на данный момент не поддерживают дополнительные Params
(как вы можете видеть ...
просто игнорируются).