Spark ML_pipelines: управление чтением таблиц - PullRequest
3 голосов
/ 28 мая 2019

Я использую Spark ML_pipelines для простого развертывания операций, которые я разработал в Sparklyr, в производственной среде с использованием SCALA. Он работает довольно хорошо, за исключением одной части: кажется, что когда я читаю таблицу из Hive и затем создаю конвейер, который применяет операции к этой таблице, конвейер также сохранит операцию чтения таблицы и, следовательно, имя таблицы , Однако я хочу, чтобы трубопровод был независим от этого.

Вот воспроизводимый пример:

Sparklyr часть:

sc = spark2_context(memory = "4G")

iris <- copy_to(sc, iris, overwrite=TRUE)

spark_write_table(iris, "base.iris")
spark_write_table(iris, "base.iris2")
df1 <- tbl(sc, "base.iris")

df2 <- df1 %>%
  mutate(foo = 5)

pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(df2) %>%
  ml_fit(df1)

ml_save(pipeline,
        paste0(save_pipeline_path, "test_pipeline_reading_from_table"),
        overwrite = TRUE)

df2 <- pipeline %>% ml_transform(df1)

dbSendQuery(sc, "drop table base.iris")

SCALA часть:

import org.apache.spark.ml.PipelineModel
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val df1 = spark.sql("select * from base.iris2") 

val pipeline = PipelineModel.load(pipeline_path + "/test_pipeline_reading_from_table")
val df2 = pipeline.transform(df1)

Я получаю эту ошибку:

org.apache.spark.sql.AnalysisException: Table or view not found: `base`.`iris`; line 2 pos 5;
'Project ['Sepal_Length, 'Sepal_Width, 'Petal_Length, 'Petal_Width, 'Species, 5.0 AS foo#110]
+- 'UnresolvedRelation `base`.`iris`

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:82)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:637)
  at org.apache.spark.ml.feature.SQLTransformer.transformSchema(SQLTransformer.scala:86)
  at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
  at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:310)
  at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
  at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
  at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
  at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:310)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:304)
  ... 71 elided

Я вижу 2 решения:

  1. Кажется, что сохранение dataframe было бы решением, но тогда мне нужно было бы найти способ не перегружать мою память, поэтому мой вопрос о unpersisting

  2. Передача имени таблицы в Hive в качестве параметра конвейера, который я пытаюсь решить в этот вопрос

Теперь, когда все это сказано, я могу что-то упустить, так как я только начинающий ...

РЕДАКТИРОВАТЬ: это значительно отличается от этого вопроса , поскольку это касается конкретной проблемы интеграции кадра данных, который был только что прочитан в конвейере, как указано в заголовке.

РЕДАКТИРОВАТЬ: что касается моего проекта, сохранение таблиц после их чтения является жизнеспособным решением. Я не знаю, есть ли лучшее решение.

1 Ответ

0 голосов
/ 28 мая 2019

Тогда конвейер назовет мою таблицу «base.table», что сделает невозможным ее применение к другой таблице.

На самом деле это не так.ft_dplyr_transformer - это синтаксический сахар для собственной Спарки SQLTransformer.Внутренне выражение dplyr преобразуется в запрос SQL, а имя таблицы заменяется на __THIS__ (заполнитель Spark, ссылающийся на текущую таблицу).

Допустим, у вас есть преобразованиекак этот:

copy_to(sc, iris, overwrite=TRUE)

df <- tbl(sc, "iris") %>%
  mutate(foo = 5)

pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(df) %>%
  ml_fit(tbl(sc, "iris"))

ml_stage(pipeline, "dplyr_transformer") %>% spark_jobj() %>% invoke("getStatement")
[1] "SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`\nFROM `__THIS__`"

Это, однако, довольно запутанный способ выражения вещей, и имеет больше смысла напрямую использовать собственный преобразователь SQL:

pipeline <- ml_pipeline(sc) %>%
  ft_sql_transformer("SELECT *, 5 as `foo` FROM __THIS__") %>%
  ml_fit(df)

Редактировать :

Проблема, с которой вы столкнулись, выглядит как ошибка.get_base_name функция возвращает имя таблицы без кавычек, поэтому значение в вашем случае будет

> get_base_name(x$ops)
<IDENT> default.iris

, а шаблон будет

> pattern
[1] "\\bdefault.iris\\b"

Однако dbplyr::sql_render возвращает полностью квалифицированное имя в кавычках:

> dbplyr::sql_render(x)
<SQL> SELECT `Sepal_Length`, `Sepal_Width`, `Petal_Length`, `Petal_Width`, `Species`, 5.0 AS `foo`
FROM `default`.`iris`

Таким образом, шаблон не соответствует имени.

...