Ошибка при запуске apache spark-kafka-hbase jar из cmd - PullRequest
0 голосов
/ 29 марта 2020

Прежде всего, я новичок во всем этом техническом стеке, поэтому, если я не предоставлю все детали, пожалуйста, дайте мне знать.

Вот моя проблема с этим: я пытаюсь сделать jar-архив приложения apache spark - kafka. Чтобы упаковать мое приложение в банку, я использую плагин sbt Assembly:

addSbtPlugin("com.eed3si9n"     % "sbt-assembly"         % "0.14.10")

пакет фляги работает успешно.

Теперь, если я попытаюсь запустить его с:

 spark-submit kafka-consumer.jar

, приложение успешно загрузится. Я хочу сделать то же самое с java -jar cmd, но, к сожалению, он не работает.

Вот как выглядит стек:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/29 11:16:23 INFO SparkContext: Running Spark version 2.4.4
20/03/29 11:16:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/29 11:16:23 INFO SparkContext: Submitted application: KafkaConsumer
20/03/29 11:16:23 INFO SecurityManager: Changing view acls to: popar
20/03/29 11:16:23 INFO SecurityManager: Changing modify acls to: popar
20/03/29 11:16:23 INFO SecurityManager: Changing view acls groups to:
20/03/29 11:16:23 INFO SecurityManager: Changing modify acls groups to:
20/03/29 11:16:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(popar); groups with view permissions: Set(); users  with modify permissions: Set(popar); groups with modify permissions: Set()
20/03/29 11:16:25 INFO Utils: Successfully started service 'sparkDriver' on port 55595.
20/03/29 11:16:25 INFO SparkEnv: Registering MapOutputTracker
20/03/29 11:16:25 INFO SparkEnv: Registering BlockManagerMaster
20/03/29 11:16:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/03/29 11:16:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/03/29 11:16:25 INFO DiskBlockManager: Created local directory at C:\Users\popar\AppData\Local\Temp\blockmgr-77af3fbc-264e-451c-9df3-5b7dda58f3a8
20/03/29 11:16:25 INFO MemoryStore: MemoryStore started with capacity 898.5 MB
20/03/29 11:16:26 INFO SparkEnv: Registering OutputCommitCoordinator
20/03/29 11:16:26 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/03/29 11:16:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-0IISN4F.mshome.net:4040
20/03/29 11:16:26 INFO Executor: Starting executor ID driver on host localhost
20/03/29 11:16:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55636.
20/03/29 11:16:26 INFO NettyBlockTransferService: Server created on DESKTOP-0IISN4F.mshome.net:55636
20/03/29 11:16:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/29 11:16:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-0IISN4F.mshome.net:55636 with 898.5 MB RAM, BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
20/03/29 11:16:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-0IISN4F.mshome.net, 55636, None)
Exception in thread "main" java.io.IOException: No FileSystem for scheme: file
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2809)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2848)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2830)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:181)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
        at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:239)
        at KafkaConsumer$.main(KafkaConsumer.scala:85)
        at KafkaConsumer.main(KafkaConsumer.scala)

Как вы можете видеть, он терпит неудачу с: Исключение в потоке "main" java .io.IOException: Нет файловой системы для схемы: file Теперь для моего основного класса это def:

import Service._
import kafka.serializer.StringDecoder
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Put, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.JavaConversions
import scala.util.Try

object KafkaConsumer {

  def setupLogging(): Unit = {
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)
  }

  def persistToHbase[A](rdd: A): Unit = {
    val connection    = getHbaseConnection
    val admin         = connection.getAdmin
    val columnFamily1 = "personal_data"
    val table         = connection.getTable(TableName.valueOf("employees"))

    val scan = new Scan()
    scan.addFamily(columnFamily1.getBytes())

    val totalRows: Int = getLastRowNumber(scan, columnFamily1, table)

    persistRdd(rdd, table, columnFamily1, totalRows + 1)
    Try(table.close())
    Try(admin.close())
    Try(connection.close())
  }

  private def getLastRowNumber[A](scan: Scan,
                                  columnFamily: String,
                                  table: Table): Int = {
    val scanner = table.getScanner(scan)
    val values  = scanner.iterator()
    val seq     = JavaConversions.asScalaIterator(values).toIndexedSeq
    seq.size
  }

  def persistRdd[A](rdd: A,
                    table: Table,
                    columnFamily: String,
                    rowNumber: Int): Unit = {
    val row       = Bytes.toBytes(String.valueOf(rowNumber))
    val put       = new Put(row)
    val qualifier = "test_column"
    put.addColumn(columnFamily.getBytes(),
                  qualifier.getBytes(),
                  String.valueOf(rdd).getBytes())
    table.put(put)
  }

  def main(args: Array[String]): Unit = {
    // create the context with a one second batch of data & uses all the CPU cores
    val context = new StreamingContext("local[*]", "KafkaConsumer", Seconds(1))

    // hostname:port for Kafka brokers
    val kafkaParams = Map("metadata.broker.list" -> "192.168.56.22:9092")

    // list of topics you want to listen from Kafka
    val topics = List("employees").toSet

    setupLogging()

    // create a Kafka Stream, which will contain(topic, message) pairs
    // we take a map(_._2) at the end in order to only get the messages which contain individual lines of data
    val stream: DStream[String] = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
        context,
        kafkaParams,
        topics)
      .map(_._2)

    // debug print
    stream.print()
//    stream.foreachRDD(rdd => rdd.foreach(persistToHbase(_)))

    context.checkpoint("C:/checkpoint/")
    context.start()
    context.awaitTermination()

  }
}

, и build.sbt выглядит следующим образом это:

import sbt._
import Keys._

name := "kafka-consumer"

version := "0.1"

scalaVersion := "2.11.8"

lazy val sparkVersion               = "2.4.4"
lazy val sparkStreamingKafkaVersion = "1.6.3"
lazy val hbaseVersion               = "2.2.1"
lazy val hadoopVersion              = "2.8.0"
lazy val hadoopCoreVersion          = "1.2.1"

resolvers in Global ++= Seq(
  "Sbt plugins" at "https://dl.bintray.com/sbt/sbt-plugin-releases"
)

lazy val commonSettings = Seq(
  version := "0.1",
  organization := "com.rares",
  scalaVersion := "2.11.8",
  test in assembly := {}
)

lazy val excludeJPountz =
  ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

lazy val excludeHadoop =
  ExclusionRule(organization = "org.apache.hadoop")

libraryDependencies ++= Seq(
  "org.apache.spark"  % "spark-core_2.11"            % sparkVersion excludeAll (excludeJPountz, excludeHadoop),
  "org.apache.spark"  % "spark-streaming-kafka_2.11" % sparkStreamingKafkaVersion,
  "org.apache.spark"  % "spark-streaming_2.11"       % sparkVersion excludeAll (excludeJPountz),
  "org.apache.hadoop" % "hadoop-client"              % hadoopVersion,
  "org.apache.hbase"  % "hbase-server"               % hbaseVersion,
  "org.apache.hbase"  % "hbase-client"               % hbaseVersion,
  "org.apache.hbase"  % "hbase-common"               % hbaseVersion
)

//Fat jar
assemblyMergeStrategy in assembly := {
  case PathList("org", "aopalliance", xs @ _*) => MergeStrategy.last
  case PathList("javax", "inject", xs @ _*)    => MergeStrategy.last
  case PathList("net", "jpountz", xs @ _*)     => MergeStrategy.last
  case PathList("META-INF", xs @ _*)           => MergeStrategy.discard
  case PathList("jetty-dir.css", xs @ _*)      => MergeStrategy.last
  case PathList("org", "apache", xs @ _*)      => MergeStrategy.last
  case PathList("com", "sun", xs @ _*)         => MergeStrategy.last
  case PathList("hdfs-default.xml", xs @ _*)   => MergeStrategy.last
  case PathList("javax", xs @ _*)              => MergeStrategy.last
  case PathList("mapred-default.xml", xs @ _*) => MergeStrategy.last
  case PathList("core-default.xml", xs @ _*)   => MergeStrategy.last
  case PathList("javax", "servlet", xs @ _*)   => MergeStrategy.last
//  case "git.properties"                             => MergeStrategy.last
//  case PathList("org", "apache", "jasper", xs @ _*) => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

assemblyJarName in assembly := "kafka-consumer.jar"

Любой совет будет высоко оценен !!!

1 Ответ

0 голосов
/ 29 марта 2020

Хорошо, вот что мне помогло. Добавьте конфигурацию had oop для контекста spark следующим образом:

val hadoopConfiguration = context.sparkContext.hadoopConfiguration
    hadoopConfiguration.set(
      "fs.hdfs.impl",
      classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
    hadoopConfiguration.set(
      "fs.file.impl",
      classOf[org.apache.hadoop.fs.LocalFileSystem].getName)

Работает как шарм!

Большое спасибо за это: has oop Нет файловой системы для схема: файл

...