Spark Streaming: агрегирование потока данных несколько раз - PullRequest
0 голосов
/ 01 октября 2018

У меня есть задание потоковой передачи, которое считывает данные из нескольких тем Кафки.Теперь я хочу объединить данные по нескольким window intervals и сохранить их в базе данных.

Возможно ли так?Или мне понадобится отдельное задание зажигания для выполнения другого уровня агрегации.

Потеряны ли данные при обработке набора данных во второй раз?

Агрегация 1:

private void buildStream1(Dataset<Row> sourceDataset) {
      Dataset<Row> query = sourceDataset
        .withWatermark("timestamp", "120 seconds")
        .select(
            col("timestamp"),
            col("datacenter"),
            col("platform")
        )
        .groupBy(
            functions.window(col("timestamp"), 120 seconds, 60 seconds).as("timestamp"),
            col("datacenter"),
            col("platform")
        )
        .agg(
            count(lit(1)).as("count")
        );
      startKafkaStream(query);
  }

Агрегация2:

private void buildStream1(Dataset<Row> sourceDataset) {
      Dataset<Row> query = sourceDataset
        .withWatermark("timestamp", "10 minutes")
        .select(
            col("timestamp"),
            col("datacenter"),
            col("platform")
        )
        .groupBy(
            functions.window(col("timestamp"), 10 minutes, 5 minutes).as("timestamp"),
            col("datacenter"),
            col("platform")
        )
        .agg(
            count(lit(1)).as("count")
        );
      startKafkaStream(query);
  }

Запись обоих потоков:

private void startKafkaStream(Dataset<Row> aggregatedDataset) {
    aggregatedDataset
        .select(to_json(struct("*")).as("value"))
        .writeStream()
        .outputMode(OutputMode.Append())
        .option("truncate", false)
        .format("console")
        .trigger(Trigger.ProcessingTime(10 minutes))
        .start();
  }
...