Как напрямую редактировать HBase HFile с помощью Spark без HBase API - PullRequest
0 голосов
/ 24 января 2019

Мне нужно массово редактировать данные HBase, редактируя содержимое конкретной ячейки для каждой строки. Пропускать через HBase PUT / GET API нельзя, так как это будет очень медленно. Я хотел бы настроить задачу Spark, которая загружает HBase HFile в правильно определенные DF, позволяет редактировать данные в определенных столбцах, а затем сохранять данные обратно в HDFS, поддерживая формат HFile.

Я нашел несколько руководств по массовой записи HFile из Spark в HDFS, однако я не уверен, как извлекать данные из HDFS. Какой тип DataFrame / RDD лучше всего подходит для такого рода задач?

Спасибо

1 Ответ

0 голосов
/ 04 февраля 2019

Ответьте себе, если это кому-то понадобится.

Возможно загрузить HFiles из снимка HBase.Выполните следующую процедуру: (в оболочке HBase) 1. отключите «пространство имен: таблица» 2. пространство имен снимка: таблица »« your_snapshot »

Это создаст доступный снимок, к которому можно получить доступ в / [HBase_path] /.snapshot / [your_snapshot]

Чтобы загрузить снимок как RDD [ImmutableBytesWritable, Result]

  def loadFromSnapshot(sc: SparkContext): RDD[ImmutableBytesWritable, Result] = {

val restorePath =
  new Path(s"hdfs://$storageDirectory/$restoreDirectory/$snapshotName")
val restorePathString = restorePath.toString

// create hbase conf starting from spark's hadoop conf
val hConf = HBaseConfiguration.create()
val hadoopConf = sc.hadoopConfiguration
HBaseConfiguration.merge(hConf, hadoopConf)

// point HBase root dir to snapshot dir
hConf.set("hbase.rootdir",
  s"hdfs://$storageDirectory/$snapshotDirectory/$snapshotName/")

// point Hadoop to the bucket as default fs
hConf.set("fs.default.name", s"hdfs://$storageDirectory/")

// configure serializations
hConf.setStrings("io.serializations",
  hadoopConf.get("io.serializations"),
  classOf[MutationSerialization].getName,
  classOf[ResultSerialization].getName,
  classOf[KeyValueSerialization].getName)

// disable caches
hConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)
hConf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f)
hConf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY)

// configure TableSnapshotInputFormat
hConf.set("hbase.TableSnapshotInputFormat.snapshot.name", settingsAccessor.settings.snapshotName)
hConf.set("hbase.TableSnapshotInputFormat.restore.dir", restorePathString)

val scan = new Scan()     // Fake scan which is applied by spark on HFile. Bypass RPC
val scanString = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}
hConf.set(TableInputFormat.SCAN, scanString)

val job = Job.getInstance(hConf)

TableSnapshotInputFormat.setInput(job, settingsAccessor.settings.snapshotName, restorePath)

// create RDD
sc.newAPIHadoopRDD(job.getConfiguration,
  classOf[TableSnapshotInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
}

Это загрузит HFile из каталога снимков и применит полное поддельное сканированиеих, что позволяет избежать медленных удаленных вызовов процедур, но позволяет иметь тот же результат сканирования.

Когда вы закончите, вы можете снова включить вашу таблицу

  • включить 'nasmespace: table' По желанию вы также можете удалить снимок (данные на самом деле не будут удалены)
  • delete_snapshot 'your_snapshot'
...