Я новичок в обработке структурированной потоковой передачи Spark и в настоящее время работаю над одним вариантом использования, когда приложение структурированной потоковой передачи будет получать события из концентратора-концентратора событий IoT Azure (скажем, каждые 20 секунд).
Задачасостоит в том, чтобы потреблять эти события и обрабатывать их в режиме реального времени. Для этого я написал ниже Spark Структурированная потоковая программа на Spark-Java.
Ниже приведены важные моменты
- В настоящее время я применил оконную операцию с интервалом 10 минут и с 5 минутами. раздвижное окно.
- Водяной знак применяется к параметру eventDate с интервалом в 10 минут.
- В настоящее время я не выполняю никаких других операций и просто храню их в указанном месте в формате Parquet.
- Программа хранит одно событие в одном файле.
Вопросы:
- Можно ли сохранить несколько событий в формате паркета в файле на основе времени окна?
- Как работает оконная операция в этом случае?
- Также я хотел бы проверить состояние события с предыдущим событием и на основе некоторых вычислений (скажем, событие не получено в течение 5 минут) Я хочудля обновления состояния.
...
public class EventSubscriber {
public static void main(String args[]) throws InterruptedException, StreamingQueryException {
String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";
String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();
EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
.setReceiverTimeout(java.time.Duration.ofMinutes(10));
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");
SparkSession spSession = SparkSession.builder()
.appName("IoT Spark Streaming")
.config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
.getOrCreate();
Dataset<Row> inputStreamDF = spSession.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load();
Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");
StructType jsonStruct = new StructType()
.add("eventType", DataTypes.StringType)
.add("payload", DataTypes.StringType);
Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
String valStr = value.getString(0).toString();
String payload = valStr;
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);
JsonElement methodName = jsonObj.get("method");
String eventType = null;
if(methodName != null) {
eventType = "OTHER_EVENT";
} else {
eventType = "DEVICE_EVENT";
}
Row jsonRow = RowFactory.create(eventType, payload);
return jsonRow;
}, RowEncoder.apply(jsonStruct));
messageRow.printSchema();
Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");
deviceEventRowDS.printSchema();
Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {
String jsonString = value.getString(1).toString();
Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
return deviceEvent;
}, Encoders.bean(DeviceEvent.class));
deviceEventDS.printSchema();
Dataset<Row> messageDataset = deviceEventDS.select(
functions.col("eventType"),
functions.col("deviceID"),
functions.col("description"),
functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
functions.col("deviceModel"),
functions.col("pingRate"))
.select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");
messageDataset.printSchema();
Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
.groupBy(functions.col("deviceID"),
functions.window(
functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();
devWindowDataset.printSchema();
StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
.format("parquet")
.option("truncate", "false")
.option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
.start();
query.awaitTermination();
}}
...
Любая помощь или указания по этому вопросу будут полезны.
Спасибо и всего наилучшего,
Авинаш Дешмукх