Как интегрировать спарк с доступом нескольких хостов Кассандры и разных таблиц, чтобы иметь дело с разными методами - PullRequest
0 голосов
/ 26 сентября 2018

Я пытаюсь интегрировать искру с кассандрой.У меня есть несколько методов, которым нужен доступ с разных хостов Кассандры и разных таблиц.Я не могу найти, как с этим справиться. Пожалуйста, помогите мне решить эту проблему.

Ниже приведен код программы:

def insert(data: TripHistoryData) {

      var em=sc.parallelize(Seq(data))
             em.saveToCassandra("ap", "trip_summary_data",SomeColumns( "service_id" ,"asset_id","summ_typ","summ_dt","trp_summ_id","asset_serial_no","avg_sp","c_dist","c_epa","c_gal","c_mil","device_id","device_serial_no","dist","en_addr","en_dt","en_lat","en_long","epa","gal","h_dist","h_epa","h_gal","h_mil","id_tm","max_sp","mil","rec_crt_dt","st_addr","st_lat","st_long","tr_dis","tr_dt","tr_dur"))

     }


 def update(data: TripHistoryData) {

        var em=sc.parallelize(Seq(data))
        em.saveToCassandra("ap", "trip_summary_data",SomeColumns( "service_id" ,"asset_id","summ_typ","summ_dt","trp_summ_id","asset_serial_no","avg_sp","c_dist","c_epa","c_gal","c_mil","device_id","device_serial_no","dist","en_addr","en_dt","en_lat","en_long","epa","gal","h_dist","h_epa","h_gal","h_mil","id_tm","max_sp","mil","rec_crt_dt","st_addr","st_lat","st_long","tr_dis","tr_dt","tr_dur"))
  }

def dashBoardInsert(data: TripHistoryData) {
   var em=sc.parallelize(Seq(data)) 
    em.saveToCassandra("ap", "asset_dashboard_data",SomeColumns("service_id","asset_id","hlth_typ","hlth_s_typ","asset_serial_no" as "assetSerialNo","dsh_nval_01","dsh_nval_02","dsh_val_01","dsh_val_02","hlth_col_ind","lst_rfr_dt","rec_crt_dt" ) )    
        }

def dashBoardUpdate(data: TripHistoryData) {

         var em=sc.parallelize(Seq(data))
         em.saveToCassandra("ap", "asset_dashboard_data",SomeColumns("service_id","asset_id","hlth_typ","hlth_s_typ","asset_serial_no" as "assetSerialNo","dsh_nval_01","dsh_nval_02","dsh_val_01","dsh_val_02","hlth_col_ind","lst_rfr_dt","rec_crt_dt" ) )    
        }

Ошибка StackTrace:

18/09/26 21:27:41 ERROR app.ProcessMPacket$: error for processing this event For M-packet
java.io.IOException: Couldn't find ap.asset_dashboard_data or any similarly named keyspace and table pairs
    at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:358)
    at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:379)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
    at com.vzt.afm.hum.dh.util.CassandraUtils$.dashBoardInsert(CassandraUtils.scala:275)
    at com.vzt.afm.hum.dh.app.TripAggregation$.updateOdometer(TripAggregation.scala:86)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:176)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1$$anonfun$apply$1.apply(ProcessMPacket.scala:129)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:129)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 16 октября 2018

Это описано в документации для разъема Spark Cassandra.По сути, вам нужно создать отдельные экземпляры класса CassandraConnector с разными конфигурациями, связанными с Cassandra, как минимум, с разными spark.cassandra.connection.host, а затем переопределить неявный c с правильной конфигурацией.Вот пример из документации:

val connectorToClusterOne = CassandraConnector(sc.getConf.set(
     "spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set(
     "spark.cassandra.connection.host", "127.0.0.2"))

val rddFromClusterOne = {
  // Sets connectorToClusterOne as default connection for everything in this code block
  implicit val c = connectorToClusterOne
  sc.cassandraTable("ks","tab")
}

{
  //Sets connectorToClusterTwo as the default connection for everything in this code block
  implicit val c = connectorToClusterTwo
  rddFromClusterOne.saveToCassandra("ks","tab")
}

Если вы используете DataFrames, то это еще проще, поскольку вы можете указать кластер на уровне операций ( пример взят изСообщение в блоге Рассела Спитцера ):

sqlContext.setConf("ClusterOne/spark.cassandra.connection.host", "127.0.0.1")
sqlContext.setConf("ClusterTwo/spark.cassandra.connection.host", "127.0.0.2")

//Read from ClusterOne
val dfFromClusterOne = sqlContext.read.format("org.apache.spark.sql.cassandra")
  .options(Map("cluster" -> "ClusterOne", "keyspace" -> "ks", "table" -> "tab"))
  .load

//Write to ClusterTwo
dfFromClusterOne.write.format("org.apache.spark.sql.cassandra")
  .options(Map("cluster" -> "ClusterTwo", "keyspace" -> "ks", "table" -> "tab"))
  .save
...