Мой предыдущий пост: Восстановление подготовленного предупреждения STMT .
Я не смог решить ее, с несколькими предложениями, я попытался использовать разъем искры кассандры, чтобы решить мою проблему.
Но я совершенно запутался по поводу его использования в моем приложении.
Я пытался написать код, как показано ниже, но не уверен, как именно использовать API.
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "1.1.1.1")
.set("spark.cassandra.auth.username", "auser")
.set("spark.cassandra.auth.password", "apass")
.set("spark.cassandra.connection.port","9042")
val sc=new SparkContext(conf)
val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.prepareStatement(session,insertQuery)
val boundStatement = new BoundStatement(insertStatement)
batch.add(boundStatement.bind(data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))
)
def prepareStatement(session: Session, query: String): PreparedStatement = {
val cluster = session.clustername
get(cluster, query.toString) match {
case Some(stmt) => stmt
case None =>
synchronized {
get(cluster, query.toString) match {
case Some(stmt) => stmt
case None =>
val stmt = session.prepare(query)
put(cluster, query.toString, stmt)
}
}
}
}
-----------------------------------------------------------------------------------------OR
val table1 = spark.read
.format("org.apache.spark.sql.cassandra")
.option( "spark.cassandra.auth.username","apoch_user")
.option("spark.cassandra.auth.password","Apoch#123")
.options(Map(
"table" -> "trip_summary_data",
"keyspace" -> "aphoc" ,
"cluster" -> "Cluster1"
) ).load()
def insert( data: TripHistoryData) {
table1.createOrReplaceTempView("inputTable1");
val df1= spark.sql("select * from inputTable1 where service_id = ? and asset_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?");
val df2=spark.sql("insert into inputTable1 values (data.service_id, data.asset_id, data.summ_typ, data.summ_dt, data.trp_summ_id, data.asset_serial_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.device_id, data.device_serial_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))
}