Прежде всего, я новичок во всем этом техническом стеке, поэтому, если я не предоставлю все детали, пожалуйста, дайте мне знать.
Вот моя проблема с этим: я пытаюсь сделать jar-архив приложения apache spark - kafka. Чтобы упаковать мое приложение в банку, я использую плагин sbt Assembly:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
пакет фляги работает успешно.
Теперь, если я попытаюсь запустить его с:
spark-submit kafka-consumer.jar
, приложение успешно загрузится. Я хочу сделать то же самое с java -jar
cmd, но, к сожалению, он не работает.
Вот как выглядит стек:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/29 11:16:23 INFO SparkContext: Running Spark version 2.4.4
20/03/29 11:16:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/29 11:16:23 INFO SparkContext: Submitted application: KafkaConsumer
20/03/29 11:16:23 INFO SecurityManager: Changing view acls to: popar
20/03/29 11:16:23 INFO SecurityManager: Changing modify acls to: popar
20/03/29 11:16:23 INFO SecurityManager: Changing view acls groups to:
20/03/29 11:16:23 INFO SecurityManager: Changing modify acls groups to:
20/03/29 11:16:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(popar); groups with view permissions: Set(); users with modify permissions: Set(popar); groups with modify permissions: Set()
20/03/29 11:16:25 INFO Utils: Successfully started service 'sparkDriver' on port 55595.
20/03/29 11:16:25 INFO SparkEnv: Registering MapOutputTracker
20/03/29 11:16:25 INFO SparkEnv: Registering BlockManagerMaster
20/03/29 11:16:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/03/29 11:16:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/03/29 11:16:25 INFO DiskBlockManager: Created local directory at C:\Users\popar\AppData\Local\Temp\blockmgr-77af3fbc-264e-451c-9df3-5b7dda58f3a8
20/03/29 11:16:25 INFO MemoryStore: MemoryStore started with capacity 898.5 MB
20/03/29 11:16:26 INFO SparkEnv: Registering OutputCommitCoordinator
20/03/29 11:16:26 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/03/29 11:16:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-0IISN4F.mshome.net:4040
20/03/29 11:16:26 INFO Executor: Starting executor ID driver on host localhost
20/03/29 11:16:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55636.
20/03/29 11:16:26 INFO NettyBlockTransferService: Server created on DESKTOP-0IISN4F.mshome.net:55636
20/03/29 11:16:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/29 11:16:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-0IISN4F.mshome.net:55636 with 898.5 MB RAM, BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
Exception in thread "main" java.io.IOException: No FileSystem for scheme: file
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2809)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2848)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2830)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:181)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:239)
at KafkaConsumer$.main(KafkaConsumer.scala:85)
at KafkaConsumer.main(KafkaConsumer.scala)
Как вы можете видеть, он терпит неудачу с: Исключение в потоке "main" java .io.IOException: Нет файловой системы для схемы: file Теперь для моего основного класса это def:
import Service._
import kafka.serializer.StringDecoder
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.JavaConversions
import scala.util.Try
object KafkaConsumer {
def setupLogging(): Unit = {
val rootLogger = Logger.getRootLogger
rootLogger.setLevel(Level.ERROR)
}
def persistToHbase[A](rdd: A): Unit = {
val connection = getHbaseConnection
val admin = connection.getAdmin
val columnFamily1 = "personal_data"
val table = connection.getTable(TableName.valueOf("employees"))
val scan = new Scan()
scan.addFamily(columnFamily1.getBytes())
val totalRows: Int = getLastRowNumber(scan, columnFamily1, table)
persistRdd(rdd, table, columnFamily1, totalRows + 1)
Try(table.close())
Try(admin.close())
Try(connection.close())
}
private def getLastRowNumber[A](scan: Scan,
columnFamily: String,
table: Table): Int = {
val scanner = table.getScanner(scan)
val values = scanner.iterator()
val seq = JavaConversions.asScalaIterator(values).toIndexedSeq
seq.size
}
def persistRdd[A](rdd: A,
table: Table,
columnFamily: String,
rowNumber: Int): Unit = {
val row = Bytes.toBytes(String.valueOf(rowNumber))
val put = new Put(row)
val qualifier = "test_column"
put.addColumn(columnFamily.getBytes(),
qualifier.getBytes(),
String.valueOf(rdd).getBytes())
table.put(put)
}
def main(args: Array[String]): Unit = {
// create the context with a one second batch of data & uses all the CPU cores
val context = new StreamingContext("local[*]", "KafkaConsumer", Seconds(1))
// hostname:port for Kafka brokers
val kafkaParams = Map("metadata.broker.list" -> "192.168.56.22:9092")
// list of topics you want to listen from Kafka
val topics = List("employees").toSet
setupLogging()
// create a Kafka Stream, which will contain(topic, message) pairs
// we take a map(_._2) at the end in order to only get the messages which contain individual lines of data
val stream: DStream[String] = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
context,
kafkaParams,
topics)
.map(_._2)
// debug print
stream.print()
// stream.foreachRDD(rdd => rdd.foreach(persistToHbase(_)))
context.checkpoint("C:/checkpoint/")
context.start()
context.awaitTermination()
}
}
, и build.sbt выглядит следующим образом это:
import sbt._
import Keys._
name := "kafka-consumer"
version := "0.1"
scalaVersion := "2.11.8"
lazy val sparkVersion = "2.4.4"
lazy val sparkStreamingKafkaVersion = "1.6.3"
lazy val hbaseVersion = "2.2.1"
lazy val hadoopVersion = "2.8.0"
lazy val hadoopCoreVersion = "1.2.1"
resolvers in Global ++= Seq(
"Sbt plugins" at "https://dl.bintray.com/sbt/sbt-plugin-releases"
)
lazy val commonSettings = Seq(
version := "0.1",
organization := "com.rares",
scalaVersion := "2.11.8",
test in assembly := {}
)
lazy val excludeJPountz =
ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
lazy val excludeHadoop =
ExclusionRule(organization = "org.apache.hadoop")
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % sparkVersion excludeAll (excludeJPountz, excludeHadoop),
"org.apache.spark" % "spark-streaming-kafka_2.11" % sparkStreamingKafkaVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion excludeAll (excludeJPountz),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.apache.hbase" % "hbase-server" % hbaseVersion,
"org.apache.hbase" % "hbase-client" % hbaseVersion,
"org.apache.hbase" % "hbase-common" % hbaseVersion
)
//Fat jar
assemblyMergeStrategy in assembly := {
case PathList("org", "aopalliance", xs @ _*) => MergeStrategy.last
case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
case PathList("net", "jpountz", xs @ _*) => MergeStrategy.last
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case PathList("jetty-dir.css", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "sun", xs @ _*) => MergeStrategy.last
case PathList("hdfs-default.xml", xs @ _*) => MergeStrategy.last
case PathList("javax", xs @ _*) => MergeStrategy.last
case PathList("mapred-default.xml", xs @ _*) => MergeStrategy.last
case PathList("core-default.xml", xs @ _*) => MergeStrategy.last
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
// case "git.properties" => MergeStrategy.last
// case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
assemblyJarName in assembly := "kafka-consumer.jar"
Любой совет будет высоко оценен !!!