в спарк, если вы обновляете Hbase из исполнителей, а не из драйверов, то создайте соединение для каждого исполнителя, поэтому соединение будет повторно использоваться в том же самом исполнителе.таким образом, вы можете использовать то же время на создание соединения, но создать объект таблицы для каждого потока, потому что объект таблицы hbase не является потокобезопасным (см. официальный документ клиента Hbase).
и, конечно, закрывайте таблицу и соединение всякий раз, когда высделано.
// this object will make connection reside on executor side not Driver
// and serve as a singleton per executor JVM, which makes connection shared between executor threads, connection is thread-safe!
object HbaseHandler {
var connection: Option[Connection] = None
def put(put: Put): Unit = {
if(connection.isEmpty) {
val conn = ConnectionFactory.createConnection(HBaseConfiguration.create(configuration))
connection = Some(conn)
}
connection.get.<do some stuff>
val table = ...
try {
table.put(put)
} finally {
table.close()
}
}
}
...
rdd.foreach (
row => {
val put: Put = <generate put object>
HbaseHandler.put(put)
}
)
=========== как в примере кода выше =========
object Hbase {
private var hbaseConnection: Option[Connection] = None
private def connection: Connection = {
if(hbaseConnection.isEmpty) {
hbaseConnection = Some(ConnectionFactory.createConnection(HBaseConfiguration.create(configuration)))
}
hbaseConnection.get
}
def extractInfo(fp: String) = {
val p: Parser = new AutoDetectParser()
val fs = FileSystem.get(new java.net.URI("XXXXXXXXXX"), new Configuration())
val inputPath: Path = new Path(fp)
val is: InputStream = fs.open(inputPath)
val handler: BodyContentHandler = new BodyContentHandler(-1)
val metadata: Metadata = new Metadata()
try {
p.parse(is, handler, metadata, new ParseContext())
is.close()
val hand = handler.toString()
val gson = new Gson
val jsonTree = gson.toJsonTree(metadata)
val metaNode = jsonTree.getAsJsonObject().getAsJsonObject("metadata")
val jsonString = gson.toJson(metaNode)
if (hand.trim().isEmpty()) {
println("no Text extracted", inputPath)
} else {
println("Success")
}
val fname = "ABC"
val configuration: Configuration = HBaseConfiguration.create()
configuration.set("hbase.zookeeper.quorum", "XXXX")
configuration.set("hbase.zookeeper.property.clientPort", "XXXX")
configuration.set("zookeeper.znode.parent", "/hbase-XXX")
configuration.set("hbase.client.keyvalue.maxsize", "0")
val principal = System.getProperty("kerberosPrincipal", "XXXXX")
val keytabLocation = System.getProperty("kerberosKeytab", "XXXXXXXXX")
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(principal, keytabLocation)
val admin = connection.getAdmin
val hTable: HTable = new HTable(configuration, "XXXXXXXXX")
val g = new Put(Bytes.toBytes(fname))
g.add(Bytes.toBytes("txt"), Bytes.toBytes("text"), Bytes.toBytes(hand))
hTable.put(g)
val m = new Put(Bytes.toBytes(fname))
m.add(Bytes.toBytes("data"), Bytes.toBytes("info"), Bytes.toBytes(jsonString))
hTable.put(m)
hTable.close()
fs.close()
}
catch {
case e: Throwable => {
println(e.printStackTrace)
}
}
}
}
object App {
def main(args : Array[String]) {
val fnames = "/X/X/XXXXX.XXX"
fnames.foreach{x => Hbase.extractInfo(x) }
}
}