У меня есть требование, в котором мне нужно вычислять количество определенных значений на сегодня, когда поступит новая запись.
Входная запись выглядит следующим образом:
{"Floor_Id" : "Shop Floor 1",
"HaltRecord" : {
"HaltReason" : "Danahydraulic Error",
"Severity" : "Low",
"FaultErrorCategory" : "Docked",
"NonFaultErrorCategory" : null
},
"Description" : "Forklift",
"Category" : {
"Type" : "Halt",
"End_time" : NumberLong(2018-02-13T12:00:01),
"Start_time" : NumberLong(2018-02-13T12:00:00)
},
"Asset_Id" : 123,
"isError" : "y",
"Timestamp": 2018-02-13T12:00:01}
Выходответ должен выглядеть следующим образом:
{
"Floor_Id": "Galileo_001",
"Error_Category": [
{
"Category": "Operator Error",
"DataPoints":
{
"NumberOfErrors": 20,
"Date": 2018-02-13
}
},
{
"Category": "Dana Hydraulic Error",
"DataPoints": {
"NumberOfErrors": 15,
"Date": 2018-02-13
}
}
]
}
На данный момент я читаю запись из Kafka, извлекаю соответствующие поля и применяю фильтр к кадру данных.
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParam)
)
val schema = StructType(Seq(StructField("Floor_Id",StringType,true),
StructField("Category",StructType(Seq(StructField("Type",StringType,true),
StructField("EndTime",LongType,true),StructField("StartTime",LongType,true))),true),
StructField("HaltRecord",StructType(Seq(StructField("HaltReason",StringType,true),
StructField("Severity",StringType,true),StructField("FaultErrorCategory",StringType,true),StructField("NonFaultErrorCategory",StringType,true))),true),
StructField("Timestamp",StringType,true),
StructField("Asset_Id",IntegerType,true),
StructField("Description",StringType,true)
StructField("IsError",StringType,true)
stream.foreachRDD { rddRaw =>
val rdd = rddRaw.map(_.value.toString()) // or rddRaw.map(_._2)
val linesDataFrame=rdd.toDF("value")
val result =linesDataFrame.withColumn("value", from_json($"value", schema))
.select($"value.Floor_Id",$"value.IsError", $"value.Category.Type", $"value.Timestamp", $"value.HaltRecord.HaltReason")
val errorResult = result.map(row => (row.getString(0),row.getString(1),row.getString(2),row.getString(3),row.getString(4)))
.filter(x=> (x._1 == "Shop Floor 1" && x._2 == "y" && x._3 == "Halt"))
.map(x => (x._5,1L))
println(errorResult)
//println(errorResult.getClass) //Dataset
result.printSchema()
errorResult.show()
}
Нужны предложения о том, как можноЯ продолжаю, чтобы я мог агрегировать и продолжать обновлять состояние для текущего дня и сбросить состояние на следующий день?