Для данного сценария я хочу отфильтровать наборы данных в структурированной потоковой передаче в комбинации непрерывных и пакетных триггеров.
Я знаю, это звучит нереально или, возможно, неосуществимо. Вот то, чего я пытаюсь достичь.
Пусть интервал времени обработки, установленный в приложении, составляет 5 минут. Пусть запись будет иметь следующую схему:
{
"type":"record",
"name":"event",
"fields":[
{ "name":"Student", "type":"string" },
{ "name":"Subject", "type":"string" }
]}
Предполагается, что мое потоковое приложение записывает результат в приемник с учетом любого из двух приведенных ниже критериев.
Если у студента более 5 предметов. (Приоритет будет отдан этому критерию.)
Время обработки, указанное в триггере, истекло.
private static Injection<GenericRecord, byte[]> recordInjection;
private static StructType type;
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"alarm\","
+ "\"fields\":["
+ " { \"name\":\"student\", \"type\":\"string\" },"
+ " { \"name\":\"subject\", \"type\":\"string\" }"
+ "]}";
private static Schema.Parser parser = new Schema.Parser();
private static Schema schema = parser.parse(USER_SCHEMA);
static {
recordInjection = GenericAvroCodecs.toBinary(schema);
type = (StructType) SchemaConverters.toSqlType(schema).dataType();
}
sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get();
return RowFactory.create(record.get("student").toString(), record.get("subject").toString());
}, DataTypes.createStructType(type.fields()));
Dataset<Row> ds2 = ds1
.select("value").as(Encoders.BINARY())
.selectExpr("deserialize(value) as rows")
.select("rows.*")
.selectExpr("student","subject");
StreamingQuery query1 = ds2
.writeStream()
.foreachBatch(
new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
rowDataset.select("student,concat(',',subject)").alias("value").groupBy("student");
}
}
).format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "new_in")
.option("checkpointLocation", "checkpoint")
.outputMode("append")
.trigger(Trigger.ProcessingTime(10000))
.start();
query1.awaitTermination();
Консоль производителя Kafka:
Student:Test, Subject:x
Student:Test, Subject:y
Student:Test, Subject:z
Student:Test1, Subject:x
Student:Test2, Subject:x
Student:Test, Subject:w
Student:Test1, Subject:y
Student:Test2, Subject:y
Student:Test, Subject:v
В потребительской консоли Kafka я ожидаю, как показано ниже.
Test:{x,y,z,w,v} =>This should be the first response
Test1:{x,y} => second
Test2:{x,y} => Third by the end of processing time