saveAsNewAPIHadoopDataset выполняет однократное размещение.
Для массового размещения в таблице hbase вы можете использовать соединитель hbase-spark.Соединитель выполняет bulkPutFunc2 в mapPartition (), поэтому он эффективен.Ваш исходный код изменится, как показано ниже -
object HBaseTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "slave1,slave2,slave3")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("zookeeper.znode.parent", "/hbase")
val hbaseContext = new HBaseContext(sc, hbaseConf)
val indataRDD = sc.makeRDD(Array("1,jack,15", "2,Lily,16", "3,mike,16"))
hbaseContext.bulkPut(indataRDD, TableName.valueOf(tablename), bulkPutFunc2)
sc.stop()
}
def bulkPutFunc2(arrayRec : String): Put = {
val rec = arrayRec.split(",")
val put = new Put(Bytes.toBytes(rec(0).toInt))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(rec(1)))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(rec(2).toInt))
put
}
}
pom.xml будет иметь следующую запись -
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.2.0-cdh5.12.1</version>
<dependency>