Проблема объединения delta.io и spark-bigquery 0.15.x-beta - PullRequest
0 голосов
/ 05 мая 2020

Я пытаюсь обновить свой код для нового коннектора spark-bigquery до 0,15. {0,1} -beta, и я понял, что формат дельты больше не работает.

Я не могу читать или писать, используя дельта-форма.

Здесь вы можете найти минимальный пример записи фрейма данных с использованием дельта-формата:

scala код

import org.apache.spark.sql.SparkSession

object Delta extends App {

  val spark = SparkSession.builder.master("local[*]").getOrCreate()

  import spark.implicits._

  val df = Seq(("hi",1),("bye",2)).toDF("first","second")

  val output = "/tmp/test"

  val format = "delta"

  df.write.format(format).save(output)

}

Если я использую следующую конфигурацию, код работает без проблем

build.sbt

name := "delta-gcs"

version := "0.1"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "io.delta" %% "delta-core" % "0.6.0"
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.14.0-beta"

Но если я поменял версию spark-bigquery на новую:

libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.15.1-beta"

Я получил это ошибка:

Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper.$init$(Lcom/fasterxml/jackson/module/scala/experimental/ScalaObjectMapper;)V
    at org.apache.spark.sql.delta.util.JsonUtils$$anon$1.<init>(JsonUtils.scala:27)
    at org.apache.spark.sql.delta.util.JsonUtils$.<init>(JsonUtils.scala:27)
    at org.apache.spark.sql.delta.util.JsonUtils$.<clinit>(JsonUtils.scala)
    at org.apache.spark.sql.delta.DeltaOperations$Write.$anonfun$parameters$1(DeltaOperations.scala:58)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.delta.DeltaOperations$Write.<init>(DeltaOperations.scala:58)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at Delta$.delayedEndpoint$Delta$1(Delta.scala:15)
    at Delta$delayedInit$body.apply(Delta.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at Delta$.main(Delta.scala:3)
    at Delta.main(Delta.scala)

Нечто подобное происходит при попытке чтения из формата дельты

Ответы [ 2 ]

0 голосов
/ 05 мая 2020

Из-за обновлений зависимостей в версии 0.15.x обновлена ​​версия jackson. Более простой способ удовлетворить требования к версии как Spark- sql, так и коннектора BigQuery - это перейти к затененной версии коннектора, где все зависимости повторно упакованы и предоставлены как часть jar коннектора. Таким образом, нет конфликтных версий. Заштрихованная зависимость: "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.15.1-beta".

0 голосов
/ 05 мая 2020

Обе библиотеки используют xml библиотеку. Один из них использует более старую версию.

попробуйте добавить в sbt

libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"

Позднее отредактируйте:

Я вижу, что большой запросчик использует:

"com.fasterxml.jackson.core" % "jackson-databind" % "2.10.3",
"com.fasterxml.jackson.module" % "jackson-module-paranamer" % "2.10.3",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.3",

И spark- sql

<fasterxml.jackson.version>2.6.7</fasterxml.jackson.version>
<fasterxml.jackson-module-scala.version>2.6.7.1</fasterxml.jackson-module-scala.version>
<fasterxml.jackson.databind.version>2.6.7.3</fasterxml.jackson.databind.version>

Попробуйте использовать для них одну и ту же библиотеку Как принудительно установить конкретную c версию зависимости?

...