Spark saveAsTable создает исключение NoSuchTableException - PullRequest
0 голосов
/ 09 октября 2019

Я использую pyspark (Spark 2.3.2) saveAsTable следующим образом:

df.write.format("parquet") \
  .sortBy("id") \
  .bucketBy(50, "some_column") \
  .option("path", "test_table.parquet") \
  .saveAsTable("test_table", mode="overwrite")

В случае, когда таблица уже существует (поэтому режим «перезаписать»), это приводит к NoSuchTableException:

org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'test_table' not found in database 'test_database';
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:184)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:927)
at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:73)
at org.apache.spark.sql.execution.datasources.CatalogFileIndex.listFiles(CatalogFileIndex.scala:59)
at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions$lzycompute(DataSourceScanExec.scala:189)
at org.apache.spark.sql.execution.FileSourceScanExec.org$apache$spark$sql$execution$FileSourceScanExec$$selectedPartitions(DataSourceScanExec.scala:186)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:308)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:295)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:315)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:107)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:102)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:43)
at org.apache.spark.sql.execution.CacheManager.org$apache$spark$sql$execution$CacheManager$$recacheByCondition(CacheManager.scala:145)
at org.apache.spark.sql.execution.CacheManager$$anonfun$recacheByPath$1.apply$mcV$sp(CacheManager.scala:201)
at org.apache.spark.sql.execution.CacheManager$$anonfun$recacheByPath$1.apply(CacheManager.scala:194)
at org.apache.spark.sql.execution.CacheManager$$anonfun$recacheByPath$1.apply(CacheManager.scala:194)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:67)
at org.apache.spark.sql.execution.CacheManager.recacheByPath(CacheManager.scala:194)
at org.apache.spark.sql.internal.CatalogImpl.refreshByPath(CatalogImpl.scala:508)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:174)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:532)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:433)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Похоже, существующая таблица была успешно удалена , но следующая попытка создать новую таблицу , похоже, требует существования таблицы (см. 2-ю строку трассировки стека). Это ошибка или я что-то пропустил?

1 Ответ

0 голосов
/ 10 октября 2019

Кажется, что он работает нормально в Spark 2.4 , в сущности попытался создать примерный фрейм данных и затем записать его в Hive через spark.

from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('Magesh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = spark.createDataFrame(people)
schemaPeople.write.format("parquet").saveAsTable("test_table_spark", mode="overwrite")

После успешной записи проверил, чтобы проверить таблицу Hive, а затем изменил фрейм данных и сделал то же самое saveAsTable данные были перезаписаны новым фреймом данных

l = [('Ankit',25),('Jalfaizy',22),('Suresh',20),('Bala',26)]

можетпожалуйста, попробуйте то же самое в вашей оболочке искры и посмотрите, сработает ли она ...

Попытка сделать то же самое во внешней таблице кустов

>>> schemaPeople.show()
+---+--------+
|age|    name|
+---+--------+
| 25|   Ankit|
| 22|Jalfaizy|
| 20|  Suresh|
| 26|    Bala|
+---+--------+

>>> spark.sql("SELECT * FROM EXT_Table_Test").show()
+---+--------+
|age|    name|
+---+--------+
| 25|   Ankit|
| 22|Jalfaizy|
| 20|  Magesh|
| 26|    Bala|
+---+--------+

>>> schemaPeople.write.format("parquet") \
...   .option("path", "hdfs://path/tables/EXT_Table_Test") \
...   .saveAsTable("test_table", mode="overwrite")

Повторное чтение обновленной таблицы вызвалоследующая ошибка

Причина: java.io.FileNotFoundException: файл не существует: hdfs: /// tables / EXT_Table_Test / 000000_0 Возможно, базовые файлы были обновлены. Вы можете явно аннулировать кэш в Spark, запустив команду «REFRESH TABLE tableName» в SQL или воссоздав соответствующий набор данных / DataFrame.

>>> spark.sql("SELECT * FROM EXT_Table_Test").show()

После выполнения REFRESH TABLE чтение из Spark прошло успешно, хотя перед обновлением я смог увидеть обновленные данные в оболочке HIVE.

>>> spark.sql("REFRESH TABLE EXT_Table_Test")
DataFrame[]
>>> spark.sql("SELECT * FROM EXT_Table_Test").show()
+---+--------+
|age|    name|
+---+--------+
| 25|   Ankit|
| 22|Jalfaizy|
| 20|  Suresh|
| 26|    Bala|
+---+--------+
...