Невозможно передать набор данных из SPARK в таблицу HBase - PullRequest
1 голос
/ 27 мая 2020

У меня проблема с переносом данных из фрейма данных в искру в созданную мной таблицу HBase. Проблема, похоже, в том, что этот метод может работать только с несколькими наборами данных в соответствии с примерами в Интернете, но у меня есть фрейм данных с информацией внутри. Пожалуйста, помогите!

// Таблица HBase

create 'weatherHB', 'STATIONID','OBSERVATIONTS','TEMPERATURE'

   def catalog = s"""{
   "table":{"namespace":"default", "name":"weatherHB"},
   "rowkey":"key",
   "columns":{
   "RecordID":{"cf":"RecordID","col":"key","type":"string"},
   "StationID":{"cf":"STATIONID","col":"stationID","type":"string"},
   "ObservationTSMonth":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
   "ObservationTSDay":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
   "ObservationTSHour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
   "Temperature":{"cf":"TEMPERATURE","col":"temp","type":"string"}
   }
   }""".stripMargin

   case class TempHeader(
   recordId: String,
   station: String,
   month: String,
   date: String,
   hour: String,
   temperature: Double)

   import spark.implicits._

   val weatherDF = spark.sparkContext.textFile("1902").
   map(
   rec => List (
   rec.substring(1,26).trim(),
   rec.substring(4,10).trim(),
   rec.substring(19,21).trim(),
   rec.substring(21,23).trim(),
   rec.substring(23,25).trim(),
   rec.substring(87,92).trim()
   ) ).
   map( att => TempHeader( att(0), att(1), att(2), att(3), att(4), (att(5).trim.toDouble)/10)).toDF()

   weatherDF.printSchema()

   weatherDF.createOrReplaceTempView("TEMP")

   val query = spark.sql("""SELECT month, max(temperature), min(temperature), avg(temperature) FROM TEMP GROUP BY month ORDER by month""".stripMargin)
   query.show(10)


   import org.apache.spark.sql.execution.datasources.hbase._

   weatherDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "6")).format(
   "org.apache.spark.sql.execution.datasources.hbase").save()




1 Ответ

0 голосов
/ 27 мая 2020

Похоже, ваш catalog имеет некоторые проблемы:

  • Семейство столбцов RecordID должно быть rowkey
  • Тип данных столбца temp должен быть double

Соответствующий пример приведен в документации коннектора Spark Hbase.

Ваш каталог должен выглядеть так (я не протестировал):

def catalog = s"""{
   |"table":{"namespace":"default", "name":"weatherHB"},
   |"rowkey":"key",
   |"columns":{
     |"key":{"cf":"rowkey","col":"key","type":"string"},
     |"stationid":{"cf":"STATIONID","col":"stationID","type":"string"},
     |"month":{"cf":"OBSERVATIONTS","col":"observationTSMonth","type":"string"},
     |"date":{"cf":"OBSERVATIONTS","col":"observationTSDay","type":"string"},
     |"hour":{"cf":"OBSERVATIONTS","col":"observationTSHour","type":"string"},
     |"temperature":{"cf":"TEMPERATURE","col":"temp","type":"double"}
     |}
|}""".stripMargin

Вдобавок ваш case class должен соответствовать этому каталогу:

case class TempHeader(
  key: String,
  stationid: String,
  month: String,
  date: String,
  hour: String,
  temperature: Double)
...