Потоковая передача OpenSkyData с помощью Spark Структурированная потоковая передача - PullRequest
0 голосов
/ 06 октября 2019

Я пытаюсь передать данные из API, который описан здесь https://opensky -network.org / apidoc / rest.html # all-state-vectors . Однако ответом является просто файл JSON, который содержит метку времени и матрицу, в которой каждая строка представляет собой вектор с данными о полете, такими как номер рейса, положение самолета и т. Д.

JSON выглядит следующим образом:

{"time":1570277165,"states":[[Data for flight1],[Data for flight2],...[Data for flightN]]}

где, например, [Данные для flight1] выглядят следующим образом:

["ab1644","UAL400  ","United States",1570277164,1570277165,-89.5318,40.6011,8770.62,false,242.02,41.38,-10.4,null,9197.34,"1370",false,0]

Теперь проблема заключается в том, как выполнить потоковую передачу с использованием искры. структурированный поток в Java. Пока что это мой подход:

SparkSession session = SparkSession.builder().appName("Spark_Streaming").master("local[2]").getOrCreate();
StructType schema = new StructType()
            .add("time", DataTypes.StringType)
            .add("states", 
DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.StringType)));
Dataset<Row> rawData = session.readStream().option("maxFilesPerTrigger",1).format("json").schema(schema).json("/Users/localuser/Documents/flight/data/*");
rawData.writeStream().format("console").start().awaitTermination();

, который отлично работает и дает мне

-------------------------------------------
+----------+--------------------+
|      time|              states|
+----------+--------------------+
|1570277165|[[ab1644, UAL400 ...|
+----------+--------------------+

Но проблема в том, что это соответствует следующей структуре:

root
 |-- states: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |     |    |-- element: string (containsNull = true)
 |-- time: long (nullable = true)

где у меня есть массив строк в завернутом в элемент States. Тем не менее, spark позволяет передавать данные с помощью схемы для данных. Для этого я создал следующие схемы

    StructType statesData = new StructType()
            .add("icao24", DataTypes.StringType)
            .add("callsign", DataTypes.StringType)
            .add("origin_country", DataTypes.StringType)
            .add("time_position", DataTypes.StringType)
            .add("last_contact", DataTypes.StringType)
            .add("longitude", DataTypes.StringType)
            .add("latitude", DataTypes.StringType)
            .add("baro_altitude", DataTypes.StringType)
            .add("on_ground", DataTypes.StringType)
            .add("velocity", DataTypes.StringType)
            .add("true_track", DataTypes.StringType)
            .add("vertical_rate", DataTypes.StringType)
            .add("sensors", DataTypes.createArrayType(DataTypes.IntegerType))
            .add("geo_altitude", DataTypes.StringType)
            .add("squawk", DataTypes.StringType)
            .add("spi", DataTypes.StringType)
            .add("position_source",DataTypes.StringType);

    StructType schema = new StructType()
            .add("time", DataTypes.StringType)
            .add("states", DataTypes.createArrayType(statesData));

Но если я использую эту схему, искра не найдет никаких файлов

+----+------+
|time|states|
+----+------+
|null|  null|
|null|  null|
+----+------+

Теперь я спрашиваю себя, существует ли альтернативный подход в использованиисхемы или ее адаптации, или есть ли вероятность сгладить входной массив для использования этой плоской структуры таким образом, чтобы ее можно было объединить с соответствующими метками из схемы, но я действительно сейчас не знаю.

Так что я с нетерпением жду вашего совета.

Привет, J.Bug

...