Массовая вставка данных в HBase с использованием структурированной потоковой передачи искр - PullRequest
3 голосов
/ 24 мая 2019

Я читаю данные, поступающие с Kafka (100 000 строк в секунду), используя структурированную потоковую передачу искр, и пытаюсь вставить все данные в HBase.

Я в Cloudera Hadoop 2.6 иЯ использую Spark 2.3

Я попробовал что-то вроде того, что видел здесь .

eventhubs.writeStream
 .foreach(new MyHBaseWriter[Row])
 .option("checkpointLocation", checkpointDir)
 .start()
 .awaitTermination()

MyHBaseWriter выглядит так:

class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
  override def toPut(record: Row): Put = {
    override val tableName: String = "hbase-table-name"

    override def toPut(record: Row): Put = {
        // Get Json
        val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
        val key = data.getOrElse(Map())("key")+ ""
        val val = data.getOrElse(Map())("val")+ ""

        val p = new Put(Bytes.toBytes(key))
        //Add columns ... 
        p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))

        p
     }
    }

И класс HBaseForeachWriter выглядит следующим образом:

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
  val tableName: String

  def pool: Option[ExecutorService] = None

  def user: Option[User] = None

  private var hTable: Table = _
  private var connection: Connection = _


  override def open(partitionId: Long, version: Long): Boolean = {
    connection = createConnection()
    hTable = getHTable(connection)
    true
  }

  def createConnection(): Connection = {
    // I create HBase Connection Here
  }

  def getHTable(connection: Connection): Table = {
    connection.getTable(TableName.valueOf(Variables.getTableName()))
  }

  override def process(record: RECORD): Unit = {
    val put = toPut(record)
    hTable.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    hTable.close()
    connection.close()
  }

  def toPut(record: RECORD): Put
}

Так что здесь я делаю построчно, даже если я разрешаю 20 исполнителей и 4 ядра для каждого, у меня нет вставленных данныхсразу в HBase.Поэтому мне нужно выполнить большую загрузку, но я изо всех сил, потому что все, что я нахожу в Интернете, - это реализовать это с помощью RDD и Map / Reduce.

1 Ответ

3 голосов
/ 24 мая 2019

Что я понимаю, так это медленная скорость записи в hbase.У меня есть несколько предложений для вас.

1) hbase.client.write.buffe r .
следующее свойство может помочь вам.

hbase.client.write.buffer

Описание Размер по умолчанию буфера записи BufferedMutator в байтах.Больший буфер требует больше памяти - как на стороне клиента, так и на стороне сервера, так как сервер создает экземпляр переданного буфера записи для его обработки - но больший размер буфера уменьшает количество созданных RPC.Для оценки использования памяти на стороне сервера оцените hbase.client.write.buffer * hbase.regionserver.handler.count

По умолчанию 2097152 (около 2 МБ)

Iпредпочитайте foreachBatch см. документацию по искрам (своего рода foreachPartition в искровом ядре), а не foreach

Также в вашем hbase Writer расширяется ForeachWriter

openметод intialize список массивов put в process, добавьте put в массив списков put в close table.put(listofputs); и затем сбросьте массив, когда вы обновите таблицу ...

что он делает в основном для вашего буфераРазмер, упомянутый выше, заполняется 2 МБ, после чего он помещается в таблицу hbase.до этого записи не будут попадать в таблицу hbase.

вы можете увеличить это до 10 МБ и т. д. Таким образом, количество RPC будет уменьшено.и огромный кусок данных будет сброшен и будет находиться в таблице hbase.

, когда буфер записи заполнен и flushCommits в таблицу hbase запущена.

Пример кода: в моем ответ

2) отключить WAL вы можете отключить WAL (запись в журнал регистрации - Опасность не восстанавливается), но она ускоритсяпишет ... если не хочешь восстановить данные.

Примечание : если вы используете поиск по solr или cloudera в таблицах hbase, вам не следует его отключать, так как Solr будет работать в WAL.если вы отключите его, индексирование Solr не сработает .. это одна из распространенных ошибок, которые делают многие из нас.

Как отключить: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)

, как я уже упоминал, список путов - это хороший способ ... это старый способ (foreachPartition со списком путов) делать до структурированной потоковой передачи, как показано ниже ... где foreachPartitionдействует для каждого раздела, а не для каждого ряда.

def writeHbase(mydataframe: DataFrame) = {
      val columnFamilyName: String = "c"
      mydataframe.foreachPartition(rows => {
        val puts = new util.ArrayList[ Put ]
        rows.foreach(row => {
          val key = row.getAs[ String ]("rowKey")
          val p = new Put(Bytes.toBytes(key))
          val columnV = row.getAs[ Double ]("x")
          val columnT = row.getAs[ Long ]("y")
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("x"),
            Bytes.toBytes(columnX)
          )
          p.addColumn(
            Bytes.toBytes(columnFamilyName),
            Bytes.toBytes("y"),
            Bytes.toBytes(columnY)
          )
          puts.add(p)
        })
        HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
      })
    }

Подводя итог:

Я считаю, что нам необходимо понять психологию искры и основы, чтобы создать эффективную пару.

...