spark scala преобразовать вложенный фрейм данных во вложенный набор данных - PullRequest
0 голосов
/ 09 мая 2020

У меня есть вложенный фрейм данных «inputFlowRecordsAgg» со следующей схемой

root
 |-- FlowI.key: string (nullable = true)
 |-- FlowS.minFlowTime: long (nullable = true)
 |-- FlowS.maxFlowTime: long (nullable = true)
 |-- FlowS.flowStartedCount: long (nullable = true)
 |-- FlowI.DestPort: integer (nullable = true)
 |-- FlowI.SrcIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.DestIP: struct (nullable = true)
 |    |-- bytes: binary (nullable = true)
 |-- FlowI.L4Protocol: byte (nullable = true)
 |-- FlowI.Direction: byte (nullable = true)
 |-- FlowI.Status: byte (nullable = true)
 |-- FlowI.Mac: string (nullable = true)

Требуется преобразовать во вложенный набор данных следующих классов case

case class InputFlowV1(val FlowI: FlowI,
                             val FlowS: FlowS)

case class FlowI(val Mac: String,
                 val SrcIP: IPAddress,
                 val DestIP: IPAddress,
                 val DestPort: Int,
                 val L4Protocol: Byte,
                 val Direction: Byte,
                 val Status: Byte,
                 var key: String = "")

case class FlowS(var minFlowTime: Long,
                          var maxFlowTime: Long,
                          var flowStartedCount: Long)

, но когда Я пытаюсь преобразовать его с помощью inputFlowRecordsAgg.as [InputFlowV1]

cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
org.apache.spark.sql.AnalysisException: cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

Один комментарий попросил меня указать полный код, вот он

def getReducedFlowR(inputFlowRecords: Dataset[InputFlowV1],
                            @transient spark: SparkSession): Dataset[InputFlowV1]={


     val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "FlowI.key")
      .agg(min("FlowS.minFlowTime") as "FlowS.minFlowTime" , max("FlowS.maxFlowTime") as "FlowS.maxFlowTime",
        sum("FlowS.flowStartedCount") as "FlowS.flowStartedCount" 
        , first("FlowI.Mac") as "FlowI.Mac"
        , first("FlowI.SrcIP") as "FlowI.SrcIP" , first("FlowI.DestIP") as "FlowI.DestIP"
        ,first("FlowI.DestPort") as "FlowI.DestPort"
        , first("FlowI.L4Protocol") as "FlowI.L4Protocol"
        , first("FlowI.Direction") as "FlowI.Direction" , first("FlowI.Status") as "FlowI.Status")

        inputFlowRecordsAgg.printSchema()

        return inputFlowRecordsAgg.as[InputFlowV1]

        }

1 Ответ

0 голосов
/ 09 мая 2020

Причина в том, что ваша схема класса дела не соответствует фактической схеме данных. Пожалуйста, проверьте схему класса дела ниже. попробуйте сопоставить схему класса вашего дела со схемой данных, он будет работать.

Схема вашего класса дела:

scala> df.printSchema
root
 |-- FlowI: struct (nullable = true)
 |    |-- Mac: string (nullable = true)
 |    |-- SrcIP: string (nullable = true)
 |    |-- DestIP: string (nullable = true)
 |    |-- DestPort: integer (nullable = false)
 |    |-- L4Protocol: byte (nullable = false)
 |    |-- Direction: byte (nullable = false)
 |    |-- Status: byte (nullable = false)
 |    |-- key: string (nullable = true)
 |-- FlowS: struct (nullable = true)
 |    |-- minFlowTime: long (nullable = false)
 |    |-- maxFlowTime: long (nullable = false)
 |    |-- flowStartedCount: long (nullable = false)

Попробуйте изменить свой код, как показано ниже, теперь он должен работать.

val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "key")
      .agg(min("FlowS.minFlowTime") as "minFlowTime" , max("FlowS.maxFlowTime") as "maxFlowTime",
        sum("FlowS.flowStartedCount") as "flowStartedCount" 
        , first("FlowI.Mac") as "Mac"
        , first("FlowI.SrcIP") as "SrcIP" , first("DestIP") as "DestIP"
        ,first("FlowI.DestPort") as "DestPort"
        , first("FlowI.L4Protocol") as "L4Protocol"
        , first("FlowI.Direction") as "Direction" , first("FlowI.Status") as "Status")
        .select(struct($"key",$"Mac",$"SrcIP",$"DestIP",$"DestPort",$"L4Protocol",$"Direction",$"Status").as("FlowI"),struct($"flowStartedCount",$"minFlowTime",$"maxFlowTime").as("FlowS")) // add this line & change based on your columns .. i have added roughly..:)

...