Spark: как написать эффективный SQL-запрос для достижения этой цели - PullRequest
0 голосов
/ 19 мая 2018

У меня есть файл json, структура которого [{"time", "currentStop", "lat", "lon", "speed"}], вот пример:

[
  {"time":"2015-06-09 23:59:59","currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"},
  {"time":"2015-06-09 21:00:49","currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},
  {"time":"2015-06-09 21:55:49","currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}
]

ИЯ хочу получить результат json, который имеет структуру [{"hour", "value": ["currentStop", "lat", "lon", "speed"]}].Результат показывает ежечасные данные по различным («currentStop», «lat», «lon», «speed»).Вот результат примера (пропустите некоторые пустые значения):

[
  {"hour":0,"value":[]},
  {"hour":1,"value":[]},
  ......
  {"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}
  {"hour":23, "value": [{"currentStop":"xx","lat":22.264856,"lon":113.520450,"speed":25.30}]},
]  

Можно ли добиться этого с помощью запроса spark-sql?

Я использую spark с Java API и сЯ могу получить то, что хочу, но этот способ действительно неэффективен и стоит дорого.

Вот мой код:

Dataset<Row> bus_ic=spark.read().json(file);
bus_ic.createOrReplaceTempView("view");
StringBuilder text = new StringBuilder("[");
bus_ic.select(bus_ic.col("currentStop"),
            bus_ic.col("lon").cast("double"), bus_ic.col("speed").cast("double"),
            bus_ic.col("lat").cast("double"),bus_ic.col("LINEID"),
            bus_ic.col("time").cast("timestamp"))
            .createOrReplaceTempView("view");
StringBuilder sqlString = new StringBuilder();

for(int i = 0; i<24; i++){
   sqlString.delete(0,sqlString.length());

   sqlString.append("select currentStop, speed, lat, lon from view  where hour(time) = ")
           .append(i)
           .append(" group by currentStop, speed, lat, lon");
   Dataset<Row> t = spark.sql(sqlString.toString());
   text.append("{")
           .append("\"h\":").append(i)
           .append(",\"value\":")
           .append(t.toJSON().collectAsList().toString())
           .append("}");
   if(i!=23) text.append(",");
}
text.append("]");

Должны быть и другие способы решения этой проблемы.Как написать эффективный SQL-запрос для достижения этой цели?

1 Ответ

0 голосов
/ 19 мая 2018

Вы можете написать свой код гораздо более кратким способом (код Scala):

val bus_comb = bus_ic
  .groupBy(hour(to_timestamp(col("time"))).as("hour"))
  .agg(collect_set(struct(
    col("currentStop"), col("lat"), col("lon"), col("speed")
)).alias("value"));
bus_comb.toJSON.show(false);

// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                                                   |
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+
// |{"hour":23,"value":[{"currentStop":"xx","lat":"22.264856","lon":"113.520450","speed":"25.30"}]}                                                         |
// |{"hour":21,"value":[{"currentStop":"yy","lat":"22.263","lon":"113.52","speed":"34.5"},{"currentStop":"zz","lat":"21.3","lon":"113.521","speed":"13.7"}]}|
// +--------------------------------------------------------------------------------------------------------------------------------------------------------+

, но при наличии только 24 группирующих записей здесь нет возможности для масштабирования.Это может быть интересное упражнение, но это не то, что вы действительно можете применить к большому набору данных, где использование Spark имеет смысл.

Вы можете добавить пропущенные часы, присоединившись к range:

spark.range(0, 24).toDF("hour").join(bus_comb, Seq("hour"), "leftouter")
...