Какой лучший способ открыть и закрыть соединение на hbase с искровой потоковой передачей?
Я сделал следующий код, но я не знаю, является ли правильным:
object hbaseConnection extends Serializable {
lazy val connection = {
val config = HBaseConfiguration.create();
val connection = ConnectionFactory.createConnection(config);
val table = connection.getTable(TableName.valueOf("smartcommerce:teste"))
table
}
}
Я использую Lazyсоединение в foreachpartition, но я не знаю, где закрыто соединение.
object StreamApp extends Serializable {
def main(args: Array[String]): Unit = {
val c = new Converter();
val ss = SparkSession.builder()
.config("spark.dynamicAllocation.enabled", true)
.config("spark.shuffle.service.enabled", true).config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate();
val sc = ss.sparkContext
val sqlContext = new SQLContext(sc)
val conf = new SparkConf().setAppName("Simple Streaming Application")
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaParams = Map[String, Object](
// "bootstrap.servers" -> "hnodevp024.prd.bigdata.dc.nova:6667,hnodevp025.prd.bigdata.dc.nova:6667,hnodevp026.prd.bigdata.dc.nova:6667,hnodevp027.prd.bigdata.dc.nova:6667,hnodevp028.prd.bigdata.dc.nova:6667,hnodevp029.prd.bigdata.dc.nova:6667",
"bootstrap.servers" -> "hnodevh002.hlg.bigdata.dc.nova:6667,hnodevh005.hlg.bigdata.dc.nova:6667",
"group.id" -> "pedidos_stream",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"security.protocol" -> "PLAINTEXTSASL",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("TESTE_STREAMING")
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val t = stream.map(m => c.jsonConverter(m.value()))
t.foreachRDD { rdd =>
import sqlContext.implicits._
val df = rdd.toDF()
rdd.foreachPartition { fpartition =>
val table = connection
fpartition.foreach { f =>
val put = new Put(Bytes.toBytes(f.rowkey.toString()))
put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("idCompra"), Bytes.toBytes(f.IdCompra.toString()))
put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("IdCompraEntrega"), Bytes.toBytes(f.IdCompraEntrega.toString()))
put.addColumn(Bytes.toBytes("dados"), Bytes.toBytes("IdFreteEntregaTipo"), Bytes.toBytes(f.IdFreteEntregaTipo.toString()))
table.put(put)
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
В spark docs я не очень хорошо понял!Пожалуйста, помогите мне!
Пока