java.lang.RuntimeException: com.datastax.bdp.fs.model.NoSuchFileException: файл не найден: / tmp / hive / - PullRequest
0 голосов
/ 21 мая 2018

У меня следующий код:

def main(args: Array[String]) {

val conf = new SparkConf()
  .setAppName("Fleet")
  .set("spark.executor.memory", "1g")
  .set("spark.driver.memory", "2g")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "4")
  .set("spark.executor.cores", "3")
  .set("spark.cores.max", "12")
  .set("spark.driver.cores", "4")
  .set("spark.ui.port", "4040")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.kafka.maxRatePerPartition", "30")

val spark = SparkSession
  .builder
  .appName("Fleet")
  .config("spark.cassandra.connection.host", "192.168.0.40")
  .config("spark.cassandra.connection.port", "9042")
  .config("spark.submit.deployMode", "cluster")
  .master("local[*]")
  .getOrCreate()

val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val topics = Map("historyfleet" -> 1) 
val kafkaStream = KafkaUtils.createStream(ssc, "192.168.0.40:2181", "fleetgroup", topics)

kafkaStream.foreachRDD(rdd =>
  {
    val dfs = rdd.toDF()
    println(dfs.show())
    dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "test", "keyspace" -> "test_db")).mode(SaveMode.Append).save()
  })
ssc.start()
ssc.awaitTermination()

}

Я могу выполнить эту программу из Eclipse на локальном компьютере, но при попытке выполнить задание с помощью spark на кластере выдает ошибкукак: -

ERROR 2018-05-21 13:00:27,009 org.apache.spark.deploy.DseSparkSubmitBootstrapper: Failed to start or submit Spark application
java.lang.RuntimeException: com.datastax.bdp.fs.model.NoSuchFileException: File not found: /tmp/hive/
at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) ~[hive-exec-1.2.1.spark2.jar:1.2.1.spark2]
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:189) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_161]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_161]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_161]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_161]
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359) ~[spark-hive_2.11-2.0.2.16.jar:2.0.2.16]

Моя идея - извлечь записи из потока Кафки и отправить данные в Кассандру.Спасибо,

1 Ответ

0 голосов
/ 25 мая 2018

Вам необходимо увеличить коэффициент репликации (rf) для dsefs пространства ключей до значения больше единицы.Кроме того, dsefs пространство клавиш (как и любое другое пространство клавиш) лучше всего работает с NetworkTopologyStrategy.Вот команда, которая меняет стратегию с rf = 3.

ALTER KEYSPACE dsefs WITH replication = {'class':'NetworkTopologyStrategy', '<YOUR DC HERE>': '3'}

После изменения пространства ключей вам нужно запустить восстановление узлов всех узлов.

nodetool repair dsefs

В качестве альтернативы всему этому вы можете удалить /tmp/hive из DSEFS и воссоздать его с помощью

dse fs
mkdir -p -m 733 /tmp/hive
...