Я пытаюсь передать данные из API, который описан здесь https://opensky -network.org / apidoc / rest.html # all-state-vectors . Однако ответом является просто файл JSON, который содержит метку времени и матрицу, в которой каждая строка представляет собой вектор с данными о полете, такими как номер рейса, положение самолета и т. Д.
JSON выглядит следующим образом:
{"time":1570277165,"states":[[Data for flight1],[Data for flight2],...[Data for flightN]]}
где, например, [Данные для flight1] выглядят следующим образом:
["ab1644","UAL400 ","United States",1570277164,1570277165,-89.5318,40.6011,8770.62,false,242.02,41.38,-10.4,null,9197.34,"1370",false,0]
Теперь проблема заключается в том, как выполнить потоковую передачу с использованием искры. структурированный поток в Java. Пока что это мой подход:
SparkSession session = SparkSession.builder().appName("Spark_Streaming").master("local[2]").getOrCreate();
StructType schema = new StructType()
.add("time", DataTypes.StringType)
.add("states",
DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.StringType)));
Dataset<Row> rawData = session.readStream().option("maxFilesPerTrigger",1).format("json").schema(schema).json("/Users/localuser/Documents/flight/data/*");
rawData.writeStream().format("console").start().awaitTermination();
, который отлично работает и дает мне
-------------------------------------------
+----------+--------------------+
| time| states|
+----------+--------------------+
|1570277165|[[ab1644, UAL400 ...|
+----------+--------------------+
Но проблема в том, что это соответствует следующей структуре:
root
|-- states: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- time: long (nullable = true)
где у меня есть массив строк в завернутом в элемент States. Тем не менее, spark позволяет передавать данные с помощью схемы для данных. Для этого я создал следующие схемы
StructType statesData = new StructType()
.add("icao24", DataTypes.StringType)
.add("callsign", DataTypes.StringType)
.add("origin_country", DataTypes.StringType)
.add("time_position", DataTypes.StringType)
.add("last_contact", DataTypes.StringType)
.add("longitude", DataTypes.StringType)
.add("latitude", DataTypes.StringType)
.add("baro_altitude", DataTypes.StringType)
.add("on_ground", DataTypes.StringType)
.add("velocity", DataTypes.StringType)
.add("true_track", DataTypes.StringType)
.add("vertical_rate", DataTypes.StringType)
.add("sensors", DataTypes.createArrayType(DataTypes.IntegerType))
.add("geo_altitude", DataTypes.StringType)
.add("squawk", DataTypes.StringType)
.add("spi", DataTypes.StringType)
.add("position_source",DataTypes.StringType);
StructType schema = new StructType()
.add("time", DataTypes.StringType)
.add("states", DataTypes.createArrayType(statesData));
Но если я использую эту схему, искра не найдет никаких файлов
+----+------+
|time|states|
+----+------+
|null| null|
|null| null|
+----+------+
Теперь я спрашиваю себя, существует ли альтернативный подход в использованиисхемы или ее адаптации, или есть ли вероятность сгладить входной массив для использования этой плоской структуры таким образом, чтобы ее можно было объединить с соответствующими метками из схемы, но я действительно сейчас не знаю.
Так что я с нетерпением жду вашего совета.
Привет, J.Bug