Как выполнить пакетную вставку в hbase с помощью saveAsNewAPIHadoopDataset - PullRequest
0 голосов
/ 11 мая 2018

просто изучите искру на некоторое время found я нашел api: saveAsNewAPIHadoopDataset, когда я использую hbase, код, как показано ниже , насколько это известно , этот код может вставлять по одной строке за раз, как изменить его на пакетный ввод? я новичок .. пожалуйста, помогите ... tks

import org.apache.hadoop.hbase.client.Put  
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
import org.apache.hadoop.hbase.client.Result  
import org.apache.hadoop.hbase.util.Bytes  
import org.apache.hadoop.mapreduce.Job  
import org.apache.spark.{SparkContext, SparkConf}  

 /** 
 *
 */  
object HbaseTest2 {  

  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
    val sc = new SparkContext(sparkConf)  

    val tablename = "account"  

    sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
    sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  

    val job = Job.getInstance(sc.hadoopConfiguration)  
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
    job.setOutputValueClass(classOf[Result])  
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])  

    val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  

    val rdd = indataRDD.map(_.split(',')).map{arr=>{  
      val put = new Put(Bytes.toBytes(arr(0)))  
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
      (new ImmutableBytesWritable, put)  
    }}  

    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  

    sc.stop()  
  }  
}  

Ответы [ 2 ]

0 голосов
/ 14 сентября 2018

На самом деле вам не нужно беспокоиться об этом - под капотом put(Put) и put(List<Put>) идентичны. Они оба буферизуют сообщения и сбрасывают их порциями. Не должно быть заметной разницы в производительности.

Боюсь, другой ответ ошибочен.

0 голосов
/ 14 мая 2018

saveAsNewAPIHadoopDataset выполняет однократное размещение.

Для массового размещения в таблице hbase вы можете использовать соединитель hbase-spark.Соединитель выполняет bulkPutFunc2 в mapPartition (), поэтому он эффективен.Ваш исходный код изменится, как показано ниже -

object HBaseTest {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
        val sc = new SparkContext(sparkConf)

    val tablename = "account"

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "slave1,slave2,slave3")
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseConf.set("zookeeper.znode.parent", "/hbase")

    val hbaseContext = new HBaseContext(sc, hbaseConf)

    val indataRDD = sc.makeRDD(Array("1,jack,15", "2,Lily,16", "3,mike,16"))

    hbaseContext.bulkPut(indataRDD, TableName.valueOf(tablename), bulkPutFunc2)

    sc.stop()
}
def bulkPutFunc2(arrayRec : String): Put = {
    val rec = arrayRec.split(",")
    val put = new Put(Bytes.toBytes(rec(0).toInt))
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(rec(1)))
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(rec(2).toInt))

    put
    }

}

pom.xml будет иметь следующую запись -

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-spark</artifactId>
    <version>1.2.0-cdh5.12.1</version>
<dependency>
...