У меня есть вложенный фрейм данных «inputFlowRecordsAgg
» со следующей схемой
root
|-- FlowI.key: string (nullable = true)
|-- FlowS.minFlowTime: long (nullable = true)
|-- FlowS.maxFlowTime: long (nullable = true)
|-- FlowS.flowStartedCount: long (nullable = true)
|-- FlowI.DestPort: integer (nullable = true)
|-- FlowI.SrcIP: struct (nullable = true)
| |-- bytes: binary (nullable = true)
|-- FlowI.DestIP: struct (nullable = true)
| |-- bytes: binary (nullable = true)
|-- FlowI.L4Protocol: byte (nullable = true)
|-- FlowI.Direction: byte (nullable = true)
|-- FlowI.Status: byte (nullable = true)
|-- FlowI.Mac: string (nullable = true)
Требуется преобразовать во вложенный набор данных следующих классов case
case class InputFlowV1(val FlowI: FlowI,
val FlowS: FlowS)
case class FlowI(val Mac: String,
val SrcIP: IPAddress,
val DestIP: IPAddress,
val DestPort: Int,
val L4Protocol: Byte,
val Direction: Byte,
val Status: Byte,
var key: String = "")
case class FlowS(var minFlowTime: Long,
var maxFlowTime: Long,
var flowStartedCount: Long)
, но когда Я пытаюсь преобразовать его с помощью inputFlowRecordsAgg.as [InputFlowV1]
cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
org.apache.spark.sql.AnalysisException: cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
Один комментарий попросил меня указать полный код, вот он
def getReducedFlowR(inputFlowRecords: Dataset[InputFlowV1],
@transient spark: SparkSession): Dataset[InputFlowV1]={
val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "FlowI.key")
.agg(min("FlowS.minFlowTime") as "FlowS.minFlowTime" , max("FlowS.maxFlowTime") as "FlowS.maxFlowTime",
sum("FlowS.flowStartedCount") as "FlowS.flowStartedCount"
, first("FlowI.Mac") as "FlowI.Mac"
, first("FlowI.SrcIP") as "FlowI.SrcIP" , first("FlowI.DestIP") as "FlowI.DestIP"
,first("FlowI.DestPort") as "FlowI.DestPort"
, first("FlowI.L4Protocol") as "FlowI.L4Protocol"
, first("FlowI.Direction") as "FlowI.Direction" , first("FlowI.Status") as "FlowI.Status")
inputFlowRecordsAgg.printSchema()
return inputFlowRecordsAgg.as[InputFlowV1]
}