Извлечение и форматирование данных из HBase в scala Dataframe - PullRequest
1 голос
/ 27 апреля 2019

Я пытаюсь получить данные из таблицы hbase в среде Apache Spark, но не могу понять, как их отформатировать. Может ли кто-нибудь помочь мне.

case class systems( rowkey: String, iacp: Option[String], temp: Option[String])

type Record = (String, Option[String], Option[String])

val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam") 

scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
|        rowkey|             iacp|                temp|
+--------------+-----------------+--------------------+
|           ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
|           k6c|   0.056,NA,0.054|17.277,17.283,17.256|
|            ad|          NA,23.0|           24.0,23.6|
+--------------+-----------------+--------------------+

Тем не менее, я действительно хочу это как в следующем формате. Каждое значение, разделенное запятыми, находится в новой строке, и каждое значение NA заменяется значениями null . Значения в столбцах iacp и temp должны иметь тип float. Каждая строка может иметь различное количество значений, разделенных запятыми.

Заранее спасибо!

+--------------+-----------------+--------------------+
|        rowkey|             iacp|                temp|
+--------------+-----------------+--------------------+
|           ab7|            0.051|              17.326|
|           ab7|            0.052|              17.344|
|           ab7|            0.055|               17.21|
|           k6c|            0.056|              17.277|
|           k6c|             null|              17.283|
|           k6c|            0.054|              17.256|
|            ad|             null|                24.0|
|            ad|               23|                26.0|
+--------------+-----------------+--------------------+

1 Ответ

2 голосов
/ 28 апреля 2019

Ваша строка кода hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF должна генерировать кадр данных, эквивалентный следующему:

val df = Seq(
  ("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
  ("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
  ("ad", Some("NA,23.0"), Some("24.0,23.6"))
).toDF("rowkey", "iacp", "temp")

Чтобы преобразовать набор данных в требуемый результат, вы можете применить UDF, который объединяет элементы строк CSV iacp и temp, чтобы создать массив (Option[Double], Option[Double]), который затем explode -ed, как показано ниже:

import org.apache.spark.sql.functions._
import spark.implicits._

def pairUpCSV = udf{ (s1: String, s2: String) =>
  import scala.util.Try
  def toNumericArr(csv: String) = csv.split(",").map{
    case s if Try(s.toDouble).isSuccess => Some(s)
    case _ => None
  }
  toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
}

df.
  withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
  withColumn("csv_pair", explode($"csv_pairs")).
  select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
  show(false)
// +------+-----+------+
// |rowkey|iacp |temp  |
// +------+-----+------+
// |ab7   |0.051|17.326|
// |ab7   |0.052|17.344|
// |ab7   |0.055|17.21 |
// |k6c   |0.056|17.277|
// |k6c   |null |17.283|
// |k6c   |0.054|17.256|
// |ad    |null |24.0  |
// |ad    |23.0 |23.6  |
// +------+-----+------+

Обратите внимание, что значение NA попадает в регистр по умолчанию в методе toNumericArr, следовательно, не выделяется как отдельный случай. Кроме того, zipAll (вместо zip) используется в UDF для охвата случаев, когда строки CSV iacp и temp имеют разные размеры элементов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...