Как извлечь данные из одной ячейки в несколько столбцов, используя Scala в Spark - PullRequest
0 голосов
/ 05 мая 2019

У меня есть набор данных с разделителем пробелов в столбце "_raw"

Мне нужно извлечь данные из этого столбца в несколько столбцов

"_ raw" столбец:

Device          rReq_PS      wReq_PS        rKB_PS        wKB_PS  avgWaitMillis   avgSvcMillis   bandwUtilPct

sda                7.00         0.00         64.00          0.00           8.71           8.43           5.90

sdc                0.00         0.00          0.00          0.00           0.00           0.00           0.00

sdb                5.00        10.00         32.00         40.00           2.67           2.67           4.00

dm-0               1.00         0.00          8.00          0.00           9.00           9.00           0.90

dm-1               6.00         0.00         56.00          0.00           8.67           8.33           5.00

dm-2               5.00        10.00         32.00         40.00           2.67           2.67           4.00
dm-3               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-4               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-5               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-6               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-7               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-8               0.00         0.00          0.00          0.00           0.00           0.00           0.00

dm-9               0.00         0.00          0.00          0.00           0.00           0.00           0.00

Мне удалось извлечь с определенной строкой, но не удалось получить ее со всеми строками

val log = spark.read.format("com.databricks.spark.csv")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("sep", ",")
      .option("delimiter", "|")
      .option("multiLine", "true")
      .load("query4.csv").cache()

    log.createOrReplaceTempView("logs")
    val df = spark.sql("select _time, _raw, host from logs")

    import spark.implicits._
    val extractbandwUtilPct
    = udf{(raw: String) => raw
      .split("\n")
      .map(_.split(" +"))
      .find(_(0) == "sda")
      .map(_(7)).getOrElse("unknown")}



    val extractedData = df.filter(
      $"host" === "ausflscgap01.us.dell.com" ||
        $"host" ==="ausflscgap02.us.dell.com" ||
        $"host" === "ausplscgap01.us.dell.com" ||
        $"host" === "ausplscgap02.us.dell.com")
      .withColumn("bandwUtilPct", extractbandwUtilPct($"_raw")).drop("_raw").show()

Мне нужно извлечь столбец _raw с двумя новыми столбцами «Device» и «bandwUtilPct»

Device  bandwUtilPct
sda     5.90
sdc     0.00
sbd     4.00
dm-0    0.90
dm-1    5.00
dm-2    4.00
dm-3    0.00
'
'
'
dm-9    0.00




import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.functions._
object IOSTAT_extracted_data {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("IOSTAT")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val log = spark.read.format("com.databricks.spark.csv")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("multiLine", "true")
      .load("query4.csv").cache()

   val df = log.select("_time", "_raw", "host").toDF()

    import spark.implicits._
    case class RawInfo(Device: String, bandwUtilPct: String)

    val extractRawInfo = udf{raw: String =>
      val all = raw
        .split("\n")
        .map(_.split(" +"))
        .find(_(0) == "sda")

      def getValue(pos: Int) = all.map(_(pos)).getOrElse("unknown")

      RawInfo(
        Device = getValue(0),
        bandwUtilPct = getValue(7))
    }

    val extractedData = df.filter($"host".isin("ausflscgap01.us.dell.com", "ausflscgap02.us.dell.com", "ausplscgap01.us.dell.com", "ausplscgap02.us.dell.com"))
      .withColumn("info", extractRawInfo($"_raw"))
      .select("info.device", "info.bandwUtilPct", "host", "time")
      .drop("info")
      .show()

//    extractedData.coalesce(1).write.format("csv").option("header", "true").option("sep", ",").option("multiLine", "true").save("IOSTAT_extracted_data")
  }
}

1 Ответ

2 голосов
/ 05 мая 2019

Вы можете вернуть несколько значений из udf: Array [...] или case case.Лично я предпочитаю тематические классы, подобные этому:

case class RawInfo(device: String, bandwUtilPct: String)

val extractRawInfo = udf{(raw: String) =>
      val all = raw
        .split("\n")
        .map(_.split(" +"))
        .find(_(0) == "sda")

      def getValue(pos: Int) = all.map(_(pos)).getOrElse("unknown")

      RawInfo(
        device = getValue(0),
        bandwUtilPct = getValue(7))
    }

    df.filter($"host".isin("ausflscgap01.us.dell.com", "ausflscgap02.us.dell.com", "ausplscgap01.us.dell.com", "ausplscgap02.us.dell.com"))
      .withColumn("info", extractRawInfo($"_raw"))
      .select(......, "info.device", "info.bandwUtilPct")
      .show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...