Я пытаюсь обновить свой код для нового коннектора spark-bigquery до 0,15. {0,1} -beta, и я понял, что формат дельты больше не работает.
Я не могу читать или писать, используя дельта-форма.
Здесь вы можете найти минимальный пример записи фрейма данных с использованием дельта-формата:
scala код
import org.apache.spark.sql.SparkSession
object Delta extends App {
val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
val df = Seq(("hi",1),("bye",2)).toDF("first","second")
val output = "/tmp/test"
val format = "delta"
df.write.format(format).save(output)
}
Если я использую следующую конфигурацию, код работает без проблем
build.sbt
name := "delta-gcs"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "io.delta" %% "delta-core" % "0.6.0"
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.14.0-beta"
Но если я поменял версию spark-bigquery на новую:
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.15.1-beta"
Я получил это ошибка:
Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper.$init$(Lcom/fasterxml/jackson/module/scala/experimental/ScalaObjectMapper;)V
at org.apache.spark.sql.delta.util.JsonUtils$$anon$1.<init>(JsonUtils.scala:27)
at org.apache.spark.sql.delta.util.JsonUtils$.<init>(JsonUtils.scala:27)
at org.apache.spark.sql.delta.util.JsonUtils$.<clinit>(JsonUtils.scala)
at org.apache.spark.sql.delta.DeltaOperations$Write.$anonfun$parameters$1(DeltaOperations.scala:58)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.delta.DeltaOperations$Write.<init>(DeltaOperations.scala:58)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:66)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at Delta$.delayedEndpoint$Delta$1(Delta.scala:15)
at Delta$delayedInit$body.apply(Delta.scala:3)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at Delta$.main(Delta.scala:3)
at Delta.main(Delta.scala)
Нечто подобное происходит при попытке чтения из формата дельты