Я работал над некоторыми scala кодами для запуска потоковой передачи искры для сбора данных kafka. После обновления всех зависимостей и добавления новых в скомпилированный jar я могу запустить его с помощью spark-submit с кодом spark-submit /codigos/codigos/adquisicion.jar
. Но при запуске через livy2 я получаю исключение при создании потока в:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
Код, используемый для запуска через livy2:
uri<-"http://192.168.0.28:8999/batches"
jsonBody2 = paste0('{ "file": "/user/bigdata/adquisicion.jar",
"numExecutors":1,
"executorCores":3,
"name": "Adquisición Datos",
"args": [""],
"className": "codigos.adquisicionDatos" }')
request <- httr::POST(url = uri, body = jsonBody2, httr::add_headers(.headers = c('Content-Type' = 'application/json', 'X-Requested-by'='user')))
А полученное исключение следующее:
18/11/06 08:01:00 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
at org.apache.spark.streaming.kafka010.ConsumerStrategies$.Subscribe(ConsumerStrategy.scala:256)
at codigos.adquisicionDatos$.main(adquisicionDatos.scala:251)
at codigos.adquisicionDatos.main(adquisicionDatos.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:646)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.Consumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 8 more
EDIT:
Файл POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>codigos</groupId>
<artifactId>adquisicionDatos</artifactId>
<version>1.2</version>
<packaging>jar</packaging>
<name>Adquisicion Datos Kafka</name>
<properties>
<app.main.class>adquisicionDatos.SparkKMeansApp</app.main.class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.core.version>2.3.2</spark.core.version>
<!--hbase.version>0.98.7-hadoop1</hbase.version-->
<hbase.version>1.0.1</hbase.version>
<slf4j.version>1.7.5</slf4j.version>
<guava.version>13.0.1</guava.version>
<gson.version>2.2.4</gson.version>
<spark.version>2.3.2</spark.version>
</properties>
<repositories>
<repository>
<id>scalanlp.org</id>
<name>ScalaNLP Maven2 Repository</name>
<url>http://repo.scalanlp.org/repo</url>
</repository>
<repository>
<id>apache release</id>
<url>https://repository.apache.org/content/repositories/releases/</url>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>cloudera.repo</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<name>Cloudera Repositories</name>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.sameersingh.scalaplot/scalaplot -->
<dependency>
<groupId>org.sameersingh.scalaplot</groupId>
<artifactId>scalaplot</artifactId>
<version>0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.mail/javax.mail-api -->
<dependency>
<groupId>javax.mail</groupId>
<artifactId>javax.mail-api</artifactId>
<version>1.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11 -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark.version}</version>
<!--<version>2.0.5</version>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>codigos.adquisicionDatos</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Я компилирую его на сервере, а затем копирую в докер (через общий том cp ~/adquisicionDatos/target/adquisicionDatos1.2-jar-with-dependencies.jar /mnt/codigos/adquisicion.jar
) и загружаю его в HDFS, используя сценарий incrond для автоматической загрузки файлов при обновлении (работает, проверено).
При работе с spark-submit под root
user все работает нормально, но с использованием livy или под livy
пользователь не работает и выдает ошибку выше.