Spark Structured Streaming - обработка событий с операцией Window при обработке потоков с сохранением состояния - PullRequest
0 голосов
/ 18 октября 2019

Я новичок в обработке структурированной потоковой передачи Spark и в настоящее время работаю над одним вариантом использования, когда приложение структурированной потоковой передачи будет получать события из концентратора-концентратора событий IoT Azure (скажем, каждые 20 секунд).

Задачасостоит в том, чтобы потреблять эти события и обрабатывать их в режиме реального времени. Для этого я написал ниже Spark Структурированная потоковая программа на Spark-Java.

Ниже приведены важные моменты

  1. В настоящее время я применил оконную операцию с интервалом 10 минут и с 5 минутами. раздвижное окно.
  2. Водяной знак применяется к параметру eventDate с интервалом в 10 минут.
  3. В настоящее время я не выполняю никаких других операций и просто храню их в указанном месте в формате Parquet.
  4. Программа хранит одно событие в одном файле.

Вопросы:

  1. Можно ли сохранить несколько событий в формате паркета в файле на основе времени окна?
  2. Как работает оконная операция в этом случае?
  3. Также я хотел бы проверить состояние события с предыдущим событием и на основе некоторых вычислений (скажем, событие не получено в течение 5 минут) Я хочудля обновления состояния.

...

public class EventSubscriber {

   public static void main(String args[]) throws InterruptedException, StreamingQueryException {

    String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";

    String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();

    EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
            .setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
            .setReceiverTimeout(java.time.Duration.ofMinutes(10));

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");

    SparkSession spSession = SparkSession.builder()
            .appName("IoT Spark Streaming")
            .config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
            .getOrCreate();

    Dataset<Row> inputStreamDF = spSession.readStream()
            .format("eventhubs")
            .options(eventHubsConf.toMap())
            .load();

    Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");

    StructType jsonStruct = new StructType()
            .add("eventType", DataTypes.StringType)
            .add("payload", DataTypes.StringType);

    Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
        String valStr = value.getString(0).toString();

        String payload = valStr;

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);

        JsonElement methodName = jsonObj.get("method");

        String eventType = null;
        if(methodName != null) {
            eventType = "OTHER_EVENT";
        } else {
            eventType = "DEVICE_EVENT";
        }

        Row jsonRow = RowFactory.create(eventType, payload);
        return jsonRow;

    }, RowEncoder.apply(jsonStruct));

    messageRow.printSchema();

    Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");

    deviceEventRowDS.printSchema();

    Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {

        String jsonString = value.getString(1).toString();

        Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

        DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
        DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
        return deviceEvent;

    }, Encoders.bean(DeviceEvent.class));

    deviceEventDS.printSchema();

    Dataset<Row> messageDataset = deviceEventDS.select(
            functions.col("eventType"), 
            functions.col("deviceID"),
            functions.col("description"),
            functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
            functions.col("deviceModel"),
            functions.col("pingRate"))
            .select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");

    messageDataset.printSchema();

    Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
            .groupBy(functions.col("deviceID"),
                    functions.window(
                            functions.col("eventDate"), "10 minutes", "5 minutes"))
            .count();

    devWindowDataset.printSchema();

    StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
            .format("parquet")
            .option("truncate", "false")
            .option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
            .start();

    query.awaitTermination();
}}

...

Любая помощь или указания по этому вопросу будут полезны.

Спасибо и всего наилучшего,

Авинаш Дешмукх

1 Ответ

0 голосов
/ 21 октября 2019

Можно ли сохранить несколько событий в формате паркета в файле на основе времени окна?

Да.

Как работает окноработает в этом случае?

Следующий код является основной частью приложения Spark Structured Streaming:

Dataset<Row> devWindowDataset = messageDataset
  .withWatermark("eventDate", "10 minutes")
  .groupBy(
    functions.col("deviceID"),
    functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
  .count();

Это говорит о том, что базовые хранилища состояний должны сохранять состояниеза deviceID и eventDate за 10 минут и дополнительно 10 minutes (за withWatermark) за поздние события. Другими словами, вы должны видеть результаты, появляющиеся, когда событие имеет eventDate 20 минут после начала потокового запроса.

withWatermark - для поздних событий, поэтому даже когда groupBy выдастрезультат не будет получен, пока не будет превышен порог водяного знака.

И та же самая процедура применяется каждые 10 минут (+ 10 минут водяного знака) с 5-минутным слайдом окна.

Думайте о groupBy с оператором window как о множественном агрегировании.

Также я хотел бы проверить состояние события с предыдущим событием и на основе некоторых вычислений (скажем, событиене получено в течение 5 минут) Я хочу обновить состояние.

Это звучит как сценарий использования оператора KeyValueGroupedDataset.flatMapGroupsWithState (он же Произвольная агрегация потоков с отслеживанием состояния ). Консультируйтесь с Произвольными операциями с отслеживанием состояния .

Также возможно, что вам понадобится просто одна из многих стандартных функций агрегирования или пользовательская функция агрегации (UDAF) .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...