У меня есть набор данных с разделителем пробелов в столбце "_raw"
Мне нужно извлечь данные из этого столбца в несколько столбцов
"_ raw" столбец:
Device rReq_PS wReq_PS rKB_PS wKB_PS avgWaitMillis avgSvcMillis bandwUtilPct
sda 7.00 0.00 64.00 0.00 8.71 8.43 5.90
sdc 0.00 0.00 0.00 0.00 0.00 0.00 0.00
sdb 5.00 10.00 32.00 40.00 2.67 2.67 4.00
dm-0 1.00 0.00 8.00 0.00 9.00 9.00 0.90
dm-1 6.00 0.00 56.00 0.00 8.67 8.33 5.00
dm-2 5.00 10.00 32.00 40.00 2.67 2.67 4.00
dm-3 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-4 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-5 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-6 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-7 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-8 0.00 0.00 0.00 0.00 0.00 0.00 0.00
dm-9 0.00 0.00 0.00 0.00 0.00 0.00 0.00
Мне удалось извлечь с определенной строкой, но не удалось получить ее со всеми строками
val log = spark.read.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("sep", ",")
.option("delimiter", "|")
.option("multiLine", "true")
.load("query4.csv").cache()
log.createOrReplaceTempView("logs")
val df = spark.sql("select _time, _raw, host from logs")
import spark.implicits._
val extractbandwUtilPct
= udf{(raw: String) => raw
.split("\n")
.map(_.split(" +"))
.find(_(0) == "sda")
.map(_(7)).getOrElse("unknown")}
val extractedData = df.filter(
$"host" === "ausflscgap01.us.dell.com" ||
$"host" ==="ausflscgap02.us.dell.com" ||
$"host" === "ausplscgap01.us.dell.com" ||
$"host" === "ausplscgap02.us.dell.com")
.withColumn("bandwUtilPct", extractbandwUtilPct($"_raw")).drop("_raw").show()
Мне нужно извлечь столбец _raw с двумя новыми столбцами «Device» и «bandwUtilPct»
Device bandwUtilPct
sda 5.90
sdc 0.00
sbd 4.00
dm-0 0.90
dm-1 5.00
dm-2 4.00
dm-3 0.00
'
'
'
dm-9 0.00
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.functions._
object IOSTAT_extracted_data {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("IOSTAT")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val log = spark.read.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("multiLine", "true")
.load("query4.csv").cache()
val df = log.select("_time", "_raw", "host").toDF()
import spark.implicits._
case class RawInfo(Device: String, bandwUtilPct: String)
val extractRawInfo = udf{raw: String =>
val all = raw
.split("\n")
.map(_.split(" +"))
.find(_(0) == "sda")
def getValue(pos: Int) = all.map(_(pos)).getOrElse("unknown")
RawInfo(
Device = getValue(0),
bandwUtilPct = getValue(7))
}
val extractedData = df.filter($"host".isin("ausflscgap01.us.dell.com", "ausflscgap02.us.dell.com", "ausplscgap01.us.dell.com", "ausplscgap02.us.dell.com"))
.withColumn("info", extractRawInfo($"_raw"))
.select("info.device", "info.bandwUtilPct", "host", "time")
.drop("info")
.show()
// extractedData.coalesce(1).write.format("csv").option("header", "true").option("sep", ",").option("multiLine", "true").save("IOSTAT_extracted_data")
}
}