Я кодировал Kafka Producer, используя Scala в Intellij, и передал два аргумента в виде файлов.Я использовал следующий код.
package kafkaProducer
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark._
import scala.io.Source
object kafkaProducerScala extends App {
val conf = new SparkConf().
setMaster(args(0)).
setAppName("kafkaProducerScala")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val props = new Properties ()
props.put ("bootstrap.servers", "localhost:9092")
props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String] (props)
val topic = "KafkaTopics"
for (line2 <- Source.fromFile (args (2) ).getLines) {
val c = line2.toInt
for (line <- Source.fromFile (args (1) ).getLines) {
val a = line.toInt
val b = if (a > c) {
var d = a
println(d)
val record = new ProducerRecord[String, String] (topic, d.toString)
producer.send (record)
}
}
producer.close ()
}
}
Ниже приведен файл build.sbt
name := "KafkaProducer"
version := "0.1"
scalaVersion := "2.12.7"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.1"
resolvers += Resolver.mavenLocal
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
Моя цель - получить вывод в Kafka Consumer.Я получаю это отлично.Затем я создал файл .jar для spark-submit.
Я дал следующую команду spark-submit
C:\spark-2.3.1-bin-hadoop2.7\bin>spark-submit --class kafkaProducer.kafkaProducerScala C:\Users\Shaheel\IdeaProjects\KafkaProducer\target\scala-2.12\k
afkaproducer_2.12-0.1.jar local C:\Users\Shaheel\Desktop\demo.txt C:\Users\Shaheel\Desktop\condition.properties
Но я получаю следующую ошибку
2018-11-28 17:53:58 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/KafkaProducer
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Unknown Source)
at java.lang.Class.privateGetMethodRecursive(Unknown Source)
at java.lang.Class.getMethod0(Unknown Source)
at java.lang.Class.getMethod(Unknown Source)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.KafkaProducer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 11 more
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Shutdown hook called
2018-11-28 17:53:58 INFO ShutdownHookManager:54 - Deleting directory C:\Users\Shaheel\AppData\Local\Temp\spark-96060579-36cc-4c68-b85e-429acad4fd38
Помогите мне решить эту проблему.