У меня следующий код:
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("Fleet")
.set("spark.executor.memory", "1g")
.set("spark.driver.memory", "2g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", "3")
.set("spark.cores.max", "12")
.set("spark.driver.cores", "4")
.set("spark.ui.port", "4040")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.kafka.maxRatePerPartition", "30")
val spark = SparkSession
.builder
.appName("Fleet")
.config("spark.cassandra.connection.host", "192.168.0.40")
.config("spark.cassandra.connection.port", "9042")
.config("spark.submit.deployMode", "cluster")
.master("local[*]")
.getOrCreate()
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val topics = Map("historyfleet" -> 1)
val kafkaStream = KafkaUtils.createStream(ssc, "192.168.0.40:2181", "fleetgroup", topics)
kafkaStream.foreachRDD(rdd =>
{
val dfs = rdd.toDF()
println(dfs.show())
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "test", "keyspace" -> "test_db")).mode(SaveMode.Append).save()
})
ssc.start()
ssc.awaitTermination()
}
Я могу выполнить эту программу из Eclipse на локальном компьютере, но при попытке выполнить задание с помощью spark на кластере выдает ошибкукак: -
ERROR 2018-05-21 13:00:27,009 org.apache.spark.deploy.DseSparkSubmitBootstrapper: Failed to start or submit Spark application
java.lang.RuntimeException: com.datastax.bdp.fs.model.NoSuchFileException: File not found: /tmp/hive/
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) ~[hive-exec-1.2.1.spark2.jar:1.2.1.spark2]
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:189) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_161]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_161]
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]
Моя идея - извлечь записи из потока Кафки и отправить данные в Кассандру.Спасибо,