UnsatisfiedLinkError в Apache Spark при записи Parquet в AWS S3 с помощью Staging S3A Committer - PullRequest
1 голос
/ 09 мая 2020

Я пытаюсь записать данные Parquet в каталог AWS S3 с помощью Apache Spark. Я использую свой локальный компьютер на Windows 10 без установленных Spark и Had oop, а добавляю их как зависимость SBT (Had oop 3.2.1, Spark 2.4.5). Мой SBT ниже:

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.5",
  "org.apache.spark" %% "spark-hadoop-cloud" % "2.3.2.3.1.0.6-1",

  "org.apache.hadoop" % "hadoop-client" % "3.2.1",
  "org.apache.hadoop" % "hadoop-common" % "3.2.1",
  "org.apache.hadoop" % "hadoop-aws" % "3.2.1",

  "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.704"
)

dependencyOverrides ++= Seq(
  "com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"
)

resolvers ++= Seq(
  "apache" at "https://repo.maven.apache.org/maven2",
  "hortonworks" at "https://repo.hortonworks.com/content/repositories/releases/",
)

Я использую S3A Staging Directory Committer, как описано в документации Had oop и Cloudera . Мне также известны эти два вопроса по StackOverflow, и я использовал их для правильной настройки:

Я добавил все необходимые (на мой взгляд) конфигурации, включая последние две c для Parquet:

val spark = SparkSession.builder()
      .appName("test-run-s3a-commiters")
      .master("local[*]")

      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
      .config("spark.hadoop.fs.s3a.connection.maximum", "100")

      .config("spark.hadoop.fs.s3a.committer.name", "directory")
      .config("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
      .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
      .config("spark.hadoop.fs.s3a.committer.staging.unique-filenames", "true")
      .config("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true")
      .config("spark.hadoop.fs.s3a.buffer.dir", "tmp/")
      .config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "hdfs_tmp/")
      .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
      .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")

      .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
      .config("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
      .getOrCreate()

spark.sparkContext.setLogLevel("info")

Из журналов я вижу, что StagingCommitter действительно применяется (также я могу видеть промежуточные данные в моей локальной файловой системе по указанным путям, а не _ Contemporary каталог в S3 во время выполнения, как если бы он был по умолчанию FileOutputCommitter ).

Затем я запускаю простой код для записи тестовых данных в корзину S3:

import spark.implicits._

val sourceDF = spark
  .range(0, 10000)
  .map(id => {
    Thread.sleep(10)
    id
  })

sourceDF
  .write
  .format("parquet")
  .save("s3a://my/test/bucket/")

(я использую Thread.sleep для имитации некоторой обработки и у меня мало времени, чтобы проверить промежуточное содержимое моего локального временного каталога и корзины S3)

Однако я получаю java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat er ror во время попытки фиксации задачи. Ниже представлен фрагмент логов (уменьшенный до 1 исполнителя) и трассировка стека ошибок.

20/05/09 15:13:18 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 15000
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0: duration 0:00.005s
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0: duration 0:00.019s
20/05/09 15:13:18 ERROR Utils: Aborting task
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:460)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:821)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:735)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:703)
    at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2091)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2071)
    at org.apache.hadoop.fs.FileSystem$5.hasNext(FileSystem.java:2190)
    at org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles(S3AUtils.java:1295)
    at org.apache.hadoop.fs.s3a.S3AUtils.flatmapLocatedFiles(S3AUtils.java:1333)
    at org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter(S3AUtils.java:1350)
    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.getTaskOutput(StagingCommitter.java:385)
    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.commitTask(StagingCommitter.java:641)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.commitTask(PathOutputCommitProtocol.scala:220)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/05/09 15:13:18 ERROR Utils: Aborting task

Согласно моему текущему пониманию, конфигурация правильная. Вероятно, ошибка вызвана несовместимостью версий или настройками моей локальной среды.

Предоставленный код работает без ошибок для OR C и CSV, но не для Parquet.

Пожалуйста , подскажите, что могло вызвать ошибку и как ее решить?

1 Ответ

2 голосов
/ 11 мая 2020

Для всех, кто сюда приходит, я нашел решение. Как и ожидалось, проблема не связана с модулями вывода S3A или зависимостями библиотек. Обертка HDFS) на моем Windows компьютере.

Я загрузил соответствующую версию из cdarlint / winutils , и все сработало. LOL

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...