Spark 2.x + Tika: java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect - PullRequest
0 голосов
/ 25 сентября 2018

Я пытаюсь решить проблему времени выполнения classpath при отправке с запросом для задания разбора Apache Tika (> v 1.14).Кажется, проблема заключается в том, что classpath типа spark-submit против моего uber-jar.

Платформы: CDH 5.15 ( Spark 2.3 добавлен через документы CDH ) и CDH 6 (Spark 2.2 в комплекте с CDH 6)

Я пробовал / проверял:

(Cloudera) Где spark-submit ищет файлы Jar?

(stackoverflow) разрешение проблем-зависимостей-в-apache-spark

(переполнение стека) Apache Tika ArchiveStreamFactory.detect error

Основные моменты:

  • Java 8 / Scala 2.11
  • Я создаю uber-jar и вызываю этот uber-jar с помощью spark-submit
  • Я попытался добавить опцию --jars для вызова spark-submit (см. Далее в этом посте)
  • Я попытался добавить --conf spark.driver.userClassPathFirst = true && --conf spark.executor.userClassPathFirst = true к вызову spark-submit (см. Далее в этом посте):

Результаты, если я включаю --conf flag (s) для spark-submit:

$ spark-submit --master local[*] --class com.example.App --conf spark.executor.userClassPathFirst=true ./target/uber-tikaTest-1.19.jar

18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/09/25 13:35:55 ERROR util.Utils: Exception encountered
java.lang.NullPointerException
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:72)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1307)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:312)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Ниже приведено следующее сообщение об ошибке для файлов:

  • скрипт build-and-run.sh (вызывает spark-submit - заметки о включенных опциях)
  • пример приложения
  • pom.xml
  • mvnВывод дерева зависимостей (который показывает, что «отсутствующая» библиотека commons-compress включена в uber-jar)

Ошибка во время выполнения:

18/09/25 11:47:39 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
    at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
    at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
    at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
    at com.example.App$.tikaAutoDetectParser(App.scala:55)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/09/25 11:47:39 ERROR executor.Executor: Exception in task 5.0 in stage 0.0 (TID 5)
java.lang.NoSuchMethodError: org.apache.commons.compress.archivers.ArchiveStreamFactory.detect(Ljava/io/InputStream;)Ljava/lang/String;
    at org.apache.tika.parser.pkg.ZipContainerDetector.detectArchiveFormat(ZipContainerDetector.java:160)
    at org.apache.tika.parser.pkg.ZipContainerDetector.detect(ZipContainerDetector.java:104)
    at org.apache.tika.detect.CompositeDetector.detect(CompositeDetector.java:84)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:116)
    at org.apache.tika.parser.AutoDetectParser.parse(AutoDetectParser.java:159)
    at com.example.App$.tikaAutoDetectParser(App.scala:55)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at com.example.App$$anonfun$1.apply(App.scala:69)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1799)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2071)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

build-and-run.sh:

Примечания:

  • Я попытался добавить флаги --conf для userClassPathFirst как в основных, так и в конфигурациях пряжиниже,
  • с использованием флага --jar для указания uber-jar, сгенерированного из компиляции mvn с pom.xml (представлен далее в посте)

build-and-run.sh

mvn compile

if true
then
spark-submit --master local[*] --class com.example.App ./target/uber-tikaTest-1.19.jar
fi

# tried the using the userClass flags for driver and executor for above and below calls to spark-submit
# --conf spark.driver.userClassPathFirst=true \
# --conf spark.executor.userClassPathFirst=true \

if false 
then
spark-submit --class com.example.App \
 --master yarn \
 --packages org.apache.commons:commons-compress:1.18 \
 --jars ./target/uber-tikaTest-1.19.jar \
 --num-executors 2 \
 --executor-memory 1024m \
 --executor-cores 2 \
 --driver-memory 2048m \
 --driver-cores 1 \
 ./target/uber-tikaTest-1.19.jar
fi

Пример приложения:

package com.example
////////// Tika Imports
import org.apache.tika.metadata.Metadata
import org.apache.tika.parser.AutoDetectParser
import org.apache.tika.sax.BodyContentHandler
////////// Java HTTP Imports 
import java.net.URL;
import java.net.HttpURLConnection
import scala.collection.JavaConverters._
import scala.collection.mutable._
////////// Spark Imports 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.{Row,SparkSession}


object App {
  case class InputStreamData(sourceURL: String, headerFields: Map[String,List[String]], inputStream: java.io.InputStream)

  def openUrlStream(sourceURL:String,apiKey:String):(InputStreamData) = {
    try {
     val url = new URL(sourceURL)
         val urlConnection = url.openConnection().asInstanceOf[HttpURLConnection] 
     urlConnection.setInstanceFollowRedirects(true)
         val headerFields = urlConnection.getHeaderFields()
         val input = urlConnection.getInputStream()
     InputStreamData(sourceURL, headerFields.asScala.map(x => (x._1,x._2.asScala.toList)), input)
    }
      catch {
      case e: Exception => {
        println("**********************************************************************************************")
        println("PARSEURL: INVALID URL: " + sourceURL)
        println(e.toString())
        println("**********************************************************************************************")

        InputStreamData(sourceURL, Map("ERROR" -> List("ERROR")), null)
      }
    }
  }

  def tikaAutoDetectParser(inputStream:java.io.InputStream):String = {
    var parser = new AutoDetectParser();
    var handler = new BodyContentHandler(-1);
    var metadata = new Metadata();
    parser.parse(inputStream, handler, metadata);
    return handler.toString()
  }

  def main(args : Array[String]) {
    var sparkConf = new SparkConf().setAppName("tika-1.19-test")
    val sc = new SparkContext(sparkConf) 
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    println("HELLO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    var urls = List("http://www.pdf995.com/samples/pdf.pdf", "https://www.amd.com/en", "http://jeroen.github.io/images/testocr.png")

    var rdd = sc.parallelize(urls)
    var parsed = rdd.map(x => tikaAutoDetectParser(openUrlStream(x,"").inputStream))
    println(parsed.count)
  }
}

pom.xml (сборка uber-jar):

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>tikaTest</artifactId>
  <version>1.19</version>
  <name>${project.artifactId}</name>
  <description>Testing tika 1.19 with CDH 6 and 5.x, Spark 2.x, Scala 2.11.x</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>


 <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

<profiles>
    <profile>
        <id>scala-2.11.12</id>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
        <properties>
            <scalaVersion>2.11.12</scalaVersion>
            <scalaBinaryVersion>2.11.12</scalaBinaryVersion>
        </properties>
        <dependencies>
            <!-- ************************************************************************** -->
            <!-- GOOD DEPENDENCIES +++++++++++++++++++++++++++++++++++++ -->
            <!-- ************************************************************************** -->

            <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-compress</artifactId>
                <version>1.18</version>
            </dependency>

            <!-- *************** CDH flavored dependencies ***********************************************-->
            <!-- https://www.cloudera.com/documentation/spark2/latest/topics/spark2_packaging.html#versions -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.2.0.cloudera3</version>
                <!-- have tried scope provided / compile -->
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.2.0.cloudera3</version>
                    <!-- have tried scope provided / compile -->
                    <!--<scope>provided</scope>-->
                </dependency>

                <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-core -->
                <dependency>
                    <groupId>org.apache.tika</groupId>
                    <artifactId>tika-core</artifactId>
                    <version>1.19</version>
                </dependency>

                <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-parsers -->
                <dependency>
                    <groupId>org.apache.tika</groupId>
                    <artifactId>tika-parsers</artifactId>
                    <version>1.19</version>
                </dependency>

                <!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api -->
                <dependency>
                    <groupId>javax.ws.rs</groupId>
                    <artifactId>javax.ws.rs-api</artifactId>
                    <version>2.1.1</version>
                </dependency>

            <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
                <dependency>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                    <version>2.11.12</version>
                </dependency>


            <!-- **************************************************************************************************************************
            **************************** alternative dependencies that have been tried and yield same Tika error***************************
            *******************************************************************************************************************************-->
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
                <!--
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                    <version>2.2.0</version>
                </dependency>
                -->

            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
                <!--
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.2.0</version>
                </dependency>
                -->

            </dependencies>
        </profile>
    </profiles>


    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 -->
                        <arg>-nobootcp</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <finalName>uber-${project.artifactId}-${project.version}</finalName>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

дерево зависимостей mvn:

Примечания:

зависимость mvn: tree -Ddetail = true |grep compress

[INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile

$ mvn зависимость: tree -Ddetail = true |grep commons

[INFO] +- org.apache.commons:commons-compress:jar:1.18:compile
[INFO] | | | \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.5:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.apache.commons:commons-crypto:jar:1.0.0:compile
[INFO] | | +- org.codehaus.janino:commons-compiler:jar:3.0.8:compile
[INFO] | | \- commons-lang:commons-lang:jar:2.6:compile
[INFO] | +- commons-codec:commons-codec:jar:1.11:compile
[INFO] | | \- org.apache.commons:commons-collections4:jar:4.2:compile
[INFO] | +- org.apache.commons:commons-exec:jar:1.3:compile
[INFO] | +- commons-io:commons-io:jar:2.6:compile
[INFO] | +- org.apache.commons:commons-csv:jar:1.5:compile

1 Ответ

0 голосов
/ 20 апреля 2019

Что является источником исключения?

Это результат конфликта зависимостей.

Почему я не могу решить эту проблему с помощью моего POM.XML?

Потому что это не внутренний конфликт в вашем jar-файле.Это конфликт с Apache Spark.

Что именно не так?

Spark 2.x дистрибутивы включают старые версии commons-compress, в то время как библиотека Tika зависит от версии 1.18 библиотеки commons-compress.

Решение

Используйте аргумент --driver-class-path в spark-shell или spark-submit, чтобы указать на правильную версию библиотеки commons-compress.

spark-submit 
    --driver-class-path ~/.m2/repository/org/apache/commons/commons-compress/1.18/commons-compress-1.18.jar
    --class {you.main.class}
....

Я столкнулся сточно такая же проблема.Нашел ответ в этом посте введите описание ссылки здесь

...