Dataframe каждой итерации строки сохранить в Кассандре - PullRequest
0 голосов
/ 30 мая 2018

У меня есть следующий код: -

 def writeToCassandra(cassandraConnector: CassandraConnector) = new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println("row is " + row.toString())}
override def close(errorOrNull: Throwable): Unit = {}

override def open(partitionId: Long, version: Long): Boolean =
  true 
}

val conf = new SparkConf()
  .setAppName("Data")
  .set("spark.cassandra.connection.host", "192.168.0.40,192.168.0.106,192.168.0.113")
  .set("spark.cassandra.connection.keep_alive_ms", "20000")
   .set("spark.executor.memory", "1g")
  .set("spark.driver.memory", "2g")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "9")
  .set("spark.executor.cores", "1")
  .set("spark.cores.max", "9")
  .set("spark.driver.cores", "3")
  .set("spark.ui.port", "4040")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.speculation", "true")

println("Spark Configuration Done")
val spark = SparkSession
  .builder
  .appName("Data")
  .config(conf)
  .master("local[2]")
  .getOrCreate()
println("Spark Session Config Done")

val cassandraConnector = CassandraConnector(conf)
import spark.implicits._
import org.apache.spark.sql.streaming.OutputMode
val dataStream =
  spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "192.168.0.78:9092,192.168.0.78:9093,192.168.0.78:9094")
    .option("subscribe", "historyfleet")
    .load()

val query =
  dataStream
    .writeStream
    .outputMode(OutputMode.Append())
    .foreach(writeToCassandra(cassandraConnector))
    .format("console")
    .start()

query.awaitTermination()
query.stop()

Выдает ошибку времени выполнения как: -

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/streaming/Source$class
at org.apache.spark.sql.kafka010.KafkaSource.<init>(KafkaSource.scala:80)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:94)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:245)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$1.applyOrElse(StreamingQueryManager.scala:241)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.streaming.Source$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more

Мое приложение занимает некоторое время, чтобы вставить DataFrame в Cassandra, поэтому я пытаюсьчтобы проверить, будет ли одна итерация ускорять мою производительность, но она выдаёт ошибку выше.Использование 3-х узлового кластера - 12 исполнителей по 1 ядру каждый.это дает 6000 вставок в секунду в Кассандре.нужно оптимизировать это.Любые предложения, пожалуйста.Спасибо,

...