Я использую Spark ML_pipelines
для простого развертывания операций, которые я разработал в Sparklyr
, в производственной среде с использованием SCALA
. Он работает довольно хорошо, за исключением одной части: кажется, что когда я читаю таблицу из Hive
и затем создаю конвейер, который применяет операции к этой таблице, конвейер также сохранит операцию чтения таблицы и, следовательно, имя таблицы , Однако я хочу, чтобы трубопровод был независим от этого.
Вот воспроизводимый пример:
Sparklyr
часть:
sc = spark2_context(memory = "4G")
iris <- copy_to(sc, iris, overwrite=TRUE)
spark_write_table(iris, "base.iris")
spark_write_table(iris, "base.iris2")
df1 <- tbl(sc, "base.iris")
df2 <- df1 %>%
mutate(foo = 5)
pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(df2) %>%
ml_fit(df1)
ml_save(pipeline,
paste0(save_pipeline_path, "test_pipeline_reading_from_table"),
overwrite = TRUE)
df2 <- pipeline %>% ml_transform(df1)
dbSendQuery(sc, "drop table base.iris")
SCALA
часть:
import org.apache.spark.ml.PipelineModel
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val df1 = spark.sql("select * from base.iris2")
val pipeline = PipelineModel.load(pipeline_path + "/test_pipeline_reading_from_table")
val df2 = pipeline.transform(df1)
Я получаю эту ошибку:
org.apache.spark.sql.AnalysisException: Table or view not found: `base`.`iris`; line 2 pos 5;
'Project ['Sepal_Length, 'Sepal_Width, 'Petal_Length, 'Petal_Width, 'Species, 5.0 AS foo#110]
+- 'UnresolvedRelation `base`.`iris`
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:82)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
at org.apache.spark.ml.feature.SQLTransformer.transformSchema(SQLTransformer.scala:86)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:310)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:304)
... 71 elided
Я вижу 2 решения:
Кажется, что сохранение dataframe
было бы решением, но тогда мне нужно было бы найти способ не перегружать мою память, поэтому мой вопрос о unpersisting
Передача имени таблицы в Hive в качестве параметра конвейера, который я пытаюсь решить в этот вопрос
Теперь, когда все это сказано, я могу что-то упустить, так как я только начинающий ...
РЕДАКТИРОВАТЬ: это значительно отличается от этого вопроса , поскольку это касается конкретной проблемы интеграции кадра данных, который был только что прочитан в конвейере, как указано в заголовке.
РЕДАКТИРОВАТЬ: что касается моего проекта, сохранение таблиц после их чтения является жизнеспособным решением. Я не знаю, есть ли лучшее решение.