Входной набор данных
Dataset<Row> inputDS = spark.read.format("avro").path("hdfs://namenode:8020/..")
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188264901 | 0002019000000| 0 | 0 |Value | 5 |
|1554188264901 | 0002019000000| 0 | 0 |SetPoint | 7 |
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188276412 | 0002019000000| 0 | 0 |SetPoint | 10 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
+---------------+---------------+----------------+-------+--------------+--------+
Промежуточный набор данных
inputDS.createOrReplaceTempView("abc");
Dataset<Row> intermediateDS<Row> =
spark.sql("select time,thingId,controller,module,variableName,value,count(time) over (partition by time) as time_count from abc")
.filter("time_count=1").drop("time_count");
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
+---------------+---------------+----------------+-------+--------------+--------+
Промежуточный набор данных - это не что иное, как столбец времени, который встречался только один раз, как указано выше.
Требуемый набор выходных данных
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639406 | 0002019000000| 0 | 0 |Voltage | 9 | // last non null value for the set (thingId, controller, module) and variableName='Voltage'
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
|1554188639407 | 0002019000000| 0 | 0 |SetPoint | 10 | // last non null value for the set (thingId, controller, module) and variableName='SetPoint'
+---------------+---------------+----------------+-------+--------------+--------+
Чтобы получить требуемый вывод, я пытался использовать UDF, как показано ниже
spark.udf().register("getLastvalue_udf",getValue,DataType.StringType);
intermediateDS=intermediateDS.withColumn("Last_Value",callUDF("getLastvalue_udf",col("variableName")));
UDF1<String,String> getValue = new UDF1<String,String>(){
@Override
public String call(String t1){
String variableName="";
if(t1=="SetPoint"){
variableName="Voltage";
}else{
variableName="SetPoint";
}
String value = String.valueOf(spark.sql("SELECT LAST(value) OVER (order by time desc) as value from abc where "
+" variableName="+ variableName +") limit 1")
return value;
}
но UDF
только что вернулся [value:String]
. spark.sql()
не работает внутри UDF.
1.) Как получить требуемый результат из UDF выше или предложить мне любой другой обходной путь.
2.) Можно ли вызвать spark sql внутри функции карты?
Спасибо.