как часть проекта для моих исследований, я создал следующий сценарий потоковой передачи искры для использования из данных искры на kafka topi c:
package twitterBatch.twitterBatch
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types._
/**
* @author theirman
*/
object App
{
def main(args: Array[String])
{
// je crée mon spark session
println("je crée mon spark session")
val spark = SparkSession.builder
.appName("twitterBatchConsumer")
.master("local")
.getOrCreate()
// je définis mon schéma
println("je définis mon schéma")
val schema = StructType(
Array(
StructField("id", LongType ),
StructField("timestamp", StringType),
StructField("lang", StringType),
StructField("text", StringType),
StructField("hashtag", StringType)
)
)
// je crée mon dataframe
println("je crée mon dataframe")
val dfKafka = spark.readStream
.format("kafka")
.option("kafka.boostrap.servers", "node20:9092,node21:9092,node22:9092")
.option("subscribe", "twitter")
.option("startingOffsets", "earliest")
.load()
// j'affiche mon dataframe
println("j'affiche mon dataframe")
dfKafka.printSchema()
}
}
Я также настроил свой pom. xml файл следующим образом, а затем сгенерировал мою банку с mvn package
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>twitterBatch</groupId>
<artifactId>twitterBatch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>My wonderfull scala app</description>
<inceptionYear>2018</inceptionYear>
<licenses>
<license>
<name>My License</name>
<url>http://....</url>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.6</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spec2.version>4.2.0</spec2.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-core_${scala.compat.version}</artifactId>
<version>${spec2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.compat.version}</artifactId>
<version>${spec2.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<pluginManagement>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<!-- Tests will be run with scalatest-maven-plugin instead -->
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuiteReport.txt</filereports>
<!-- Comma separated list of JUnit test class names to execute -->
<jUnitClasses>samples.AppTest</jUnitClasses>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Пока что не стоит беспокоиться. Затем я перевожу свою банку через scp в свою группу искр. И когда я выполняюсь, я сталкиваюсь со следующей ошибкой:
spark-submit --class twitterBatch.twitterBatch.App twitterBatch-0.0.1-SNAPSHOT.jar
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at twitterBatch.twitterBatch.App$.main(App.scala:40)
at twitterBatch.twitterBatch.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Затем я добавляю свои зависимости с помощью опции - packages , и я получаю эту новую ошибку.
spark-submit --packages org.apache.spark:spark-core_2.12:2.4.4,org.apache.kafka:kafka-clients:2.4.0,org.apache.spark:spark-streaming_2.12:2.4.4,org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.4,org.apache.spark:spark-sql_2.12:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4 --class twitterBatch.twitterBatch.App twitterBatch-0.0.1-SNAPSHOT.jar
:: problems summary ::
:::: WARNINGS
[NOT FOUND ] com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar (2ms)
==== local-m2-cache: tried
file:/root/.m2/repository/com/github/luben/zstd-jni/1.4.3-1/zstd-jni-1.4.3-1.jar
[NOT FOUND ] org.lz4#lz4-java;1.6.0!lz4-java.jar (0ms)
==== local-m2-cache: tried
file:/root/.m2/repository/org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar
[NOT FOUND ] org.slf4j#slf4j-api;1.7.28!slf4j-api.jar (0ms)
==== local-m2-cache: tried
file:/root/.m2/repository/org/slf4j/slf4j-api/1.7.28/slf4j-api-1.7.28.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: FAILED DOWNLOADS ::
:: ^ see resolution messages for details ^ ::
::::::::::::::::::::::::::::::::::::::::::::::
:: com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar
:: org.lz4#lz4-java;1.6.0!lz4-java.jar
:: org.slf4j#slf4j-api;1.7.28!slf4j-api.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [download failed: com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar, download failed: org.lz4#lz4-java;1.6.0!lz4-java.jar, download failed: org.slf4j#slf4j-api;1.7.28!slf4j-api.jar]
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1302)
at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:304)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:774)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
ОК, поэтому я добавляю эти три новые зависимости в pom. xml и параметр - package , но с новой ошибкой.
spark-submit --packages org.apache.spark:spark-core_2.12:2.4.4,org.apache.kafka:kafka-clients:2.4.0,org.apache.spark:spark-streaming_2.12:2.4.4,org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.4,org.apache.spark:spark-sql_2.12:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.4,com.github.luben:zstd-ini:1.4.3-2,org.lz4:lz4-java:1.6.0,org.slf4j:slf4j-api:1.7.28 --class twitterBatch.twitterBatch.App twitterBatch-0.0.1-SNAPSHOT.jar
:: problems summary ::
:::: WARNINGS
module not found: com.github.luben#zstd-ini;1.4.3-2
==== local-m2-cache: tried
file:/root/.m2/repository/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.pom
-- artifact com.github.luben#zstd-ini;1.4.3-2!zstd-ini.jar:
file:/root/.m2/repository/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.jar
==== local-ivy-cache: tried
/root/.ivy2/local/com.github.luben/zstd-ini/1.4.3-2/ivys/ivy.xml
-- artifact com.github.luben#zstd-ini;1.4.3-2!zstd-ini.jar:
/root/.ivy2/local/com.github.luben/zstd-ini/1.4.3-2/jars/zstd-ini.jar
==== central: tried
https://repo1.maven.org/maven2/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.pom
-- artifact com.github.luben#zstd-ini;1.4.3-2!zstd-ini.jar:
https://repo1.maven.org/maven2/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.jar
==== spark-packages: tried
https://dl.bintray.com/spark-packages/maven/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.pom
-- artifact com.github.luben#zstd-ini;1.4.3-2!zstd-ini.jar:
https://dl.bintray.com/spark-packages/maven/com/github/luben/zstd-ini/1.4.3-2/zstd-ini-1.4.3-2.jar
[NOT FOUND ] org.lz4#lz4-java;1.6.0!lz4-java.jar (0ms)
==== local-m2-cache: tried
file:/root/.m2/repository/org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar
[NOT FOUND ] org.slf4j#slf4j-api;1.7.28!slf4j-api.jar (0ms)
==== local-m2-cache: tried
file:/root/.m2/repository/org/slf4j/slf4j-api/1.7.28/slf4j-api-1.7.28.jar
[NOT FOUND ] com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar (0ms)
==== local-m2-cache: tried
file:/root/.m2/repository/com/github/luben/zstd-jni/1.4.3-1/zstd-jni-1.4.3-1.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: UNRESOLVED DEPENDENCIES ::
::::::::::::::::::::::::::::::::::::::::::::::
:: com.github.luben#zstd-ini;1.4.3-2: not found
::::::::::::::::::::::::::::::::::::::::::::::
::::::::::::::::::::::::::::::::::::::::::::::
:: FAILED DOWNLOADS ::
:: ^ see resolution messages for details ^ ::
::::::::::::::::::::::::::::::::::::::::::::::
:: com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar
:: org.lz4#lz4-java;1.6.0!lz4-java.jar
:: org.slf4j#slf4j-api;1.7.28!slf4j-api.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: com.github.luben#zstd-ini;1.4.3-2: not found, download failed: com.github.luben#zstd-jni;1.4.3-1!zstd-jni.jar, download failed: org.lz4#lz4-java;1.6.0!lz4-java.jar, download failed: org.slf4j#slf4j-api;1.7.28!slf4j-api.jar]
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1302)
at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:54)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:304)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:774)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Я не знаю, что теперь делать! Может кто-то мне помочь, пожалуйста ? Спасибо Тьерри