Если я вас правильно понял, и у вас есть " Фактические данные в строке " в текстовом поле _raw в таблице logs , вам нужно как то так:
import org.apache.spark.sql.functions._
val extractPctIdle = udf{(raw: String) =>
raw
.split("\n")
.map(_.split("\t"))
.find(_(0) == "all")
.map(_(5))
.getOrElse("unknown")
}
val extractedData = df.filter(
$"host" === "ausflscgap01.us.dell.com" ||
$"host" ==="ausflscgap02.us.dell.com" ||
$"host" === "ausplscgap01.us.dell.com" ||
$"host" === "ausplscgap02.us.dell.com")
.withColumn("pctIdle", extractPctIdle($"_raw"))
.show()
т.е. Вы можете проанализировать свое _raw поле с помощью пользовательского udf.
Это простейшая версия, но лучше сделать некоторую обработку ошибок в случае неправильно сформированного поля _raw.
Дело было смоделировано следующим образом:
case class R(host: String, _raw: String)
val df = Seq(
R("ausflscgap02.us.dell.com", "CPU\tpctUser\tpctNice\tpctSystem\tpctIowait\tpctIdle\nall\t9.55\t0.00\t36.18\t1.51\t52.76\n0\t10.00\t0.00\t37.00\t4.00\t49.00\n1\t9.00\t0.00\t34.00\t0.00\t57.00"),
R("ausplscgap01.us.dell.com", "CPU\tpctUser\tpctNice\tpctSystem\tpctIowait\tpctIdle\nall\t9.55\t0.00\t36.18\t1.51\t52.76\n0\t10.00\t0.00\t37.00\t4.00\t49.00\n1\t9.00\t0.00\t34.00\t0.00\t57.00")
).toDF()
Редактировать
Если вам нужны данные из нескольких столбцов внутри _raw:
case class RawInfo(pctUser: String, pctIdle: String)
val extractRawInfo = udf{(raw: String) =>
val all = raw
.split("\n")
.map(_.split("\t"))
.find(_(0) == "all")
def getValue(pos: Int) = all.map(_(pos)).getOrElse("unknown")
RawInfo(
pctUser = getValue(1),
pctIdle = getValue(5))
}
df.filter($"host".isin("ausflscgap01.us.dell.com", "ausflscgap02.us.dell.com", "ausplscgap01.us.dell.com", "ausplscgap02.us.dell.com"))
.withColumn("info", extractRawInfo($"_raw"))
.select("host", "info.pctUser", "info.pctIdle")
.show()
Примечание: возможно, можно вернуть только массив [string] из udf и позже извлечь определенные столбцы (например, $ "info" (0) .as ("pctUser")), но я предпочитаю типизированное решение, показанное выше.