Процесс детализации следующим образом:
- загрузка потоковой передачи данных с Kafka
- используйте KeyValueGroupedDataset для группировки потоков по атрибуту rule_id
- для каждого rule_id, значение KeyValueGroupedDataset mapGroups для набора данных и агрегирование набора данных с использованием SQL и вывод результата на консоль.
При отправке задания возникает исключение «Задача не сериализуема»
Метод выполнения задания:
spark = SparkSession.builder().appName("SparkSQLBenchmark").getOrCreate();
StructType schema = new StructType()
.add("rule_id", "string")
.add("metric", "string")
.add("m_value", "double")
.add("event_time", "timestamp");
// Create DataSet representing the stream of input lines from Kafka
Dataset<Row> metrics = spark.readStream()
.format("kafka")
.load()
.select(from_json(col("value").cast("string"), schema).as("metrics"))
.select("metrics.*")
.withWatermark("event_time", "10 seconds");
String[] ruleIds = new String[] {"1", "2"};
// use KeyValueGroupedDataset group stream by rule_id
KeyValueGroupedDataset kvGroupedDataSet = metrics.groupByKey
(new MapFunction<Row, String>() {
@Override
public String call(Row row) throws Exception {
return row.getAs("rule_id");
}
}, Encoders.STRING());
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
StreamingQuery[] queries = new StreamingQuery[ruleIds.length];
int i = 0;
// for each group, aggregate value use SQL
for(final String id : ruleIds) {
Dataset<Row> query = kvGroupedDataSet.mapGroups(new MapGroupsFunction<String, Row, Row>() {
@Override
public Row call(String key, Iterator<Row> iterator) throws Exception {
if (id.equals(key)) {
while (iterator.hasNext()) {
return iterator.next();
}
}
return RowFactory.create();
}
}, encoder);
query.createOrReplaceTempView("grouped_metrics_" + id);
final Dataset<Row> result = spark.sql("SELECT WINDOW(event_time, 10 seconds).start AS time_window, " +
"SUM(m_value) AS sum_value " +
"FROM grouped_metrics_" + "_" + id + " " +
"GROUP BY WINDOW(event_time, 10 seconds), metric");
// output result to console
StreamingQuery queryResult = result.writeStream()
.outputMode(OutputMode.Update())
.format("console")
.start();
queries[i] = queryResult;
i++;
}
for (StreamingQuery q : queries) {
q.awaitTermination();
}
}
Деталь Исключение: