Манипулировать интервалом запуска в искровой структурированной потоковой передаче - PullRequest
0 голосов
/ 24 октября 2019

Для данного сценария я хочу отфильтровать наборы данных в структурированной потоковой передаче в комбинации непрерывных и пакетных триггеров.

Я знаю, это звучит нереально или, возможно, неосуществимо. Вот то, чего я пытаюсь достичь.

Пусть интервал времени обработки, установленный в приложении, составляет 5 минут. Пусть запись будет иметь следующую схему:

  {
       "type":"record",
       "name":"event",
       "fields":[
         { "name":"Student", "type":"string" },
         { "name":"Subject", "type":"string" } 
   ]}

Предполагается, что мое потоковое приложение записывает результат в приемник с учетом любого из двух приведенных ниже критериев.

  1. Если у студента более 5 предметов. (Приоритет будет отдан этому критерию.)

  2. Время обработки, указанное в триггере, истекло.

    private static Injection<GenericRecord, byte[]> recordInjection;
    private static StructType type;
    public static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"alarm\","
            + "\"fields\":["
            + "  { \"name\":\"student\", \"type\":\"string\" },"
            + "  { \"name\":\"subject\", \"type\":\"string\" }"
            + "]}";
    
    private static Schema.Parser parser = new Schema.Parser();
    
    private static Schema schema = parser.parse(USER_SCHEMA);
    
    static {
        recordInjection = GenericAvroCodecs.toBinary(schema);
        type = (StructType) SchemaConverters.toSqlType(schema).dataType();
    
    }
    sparkSession.udf().register("deserialize", (byte[] data) -> {
            GenericRecord record = recordInjection.invert(data).get();
            return RowFactory.create(record.get("student").toString(), record.get("subject").toString());
        }, DataTypes.createStructType(type.fields()));
    
    
    Dataset<Row> ds2 = ds1
            .select("value").as(Encoders.BINARY())
            .selectExpr("deserialize(value) as rows")
            .select("rows.*")
            .selectExpr("student","subject");
    
    StreamingQuery query1 = ds2
            .writeStream()
            .foreachBatch(
                new VoidFunction2<Dataset<Row>, Long>() {
                  @Override
                  public void call(Dataset<Row> rowDataset, Long aLong) throws Exception {
                    rowDataset.select("student,concat(',',subject)").alias("value").groupBy("student");
                  }
                }
            ).format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("topic", "new_in")
            .option("checkpointLocation", "checkpoint")
            .outputMode("append")
            .trigger(Trigger.ProcessingTime(10000))
            .start();
    query1.awaitTermination();
    

Консоль производителя Kafka:

Student:Test, Subject:x
Student:Test, Subject:y
Student:Test, Subject:z
Student:Test1, Subject:x
Student:Test2, Subject:x
Student:Test, Subject:w
Student:Test1, Subject:y
Student:Test2, Subject:y
Student:Test, Subject:v

В потребительской консоли Kafka я ожидаю, как показано ниже.

Test:{x,y,z,w,v} =>This should be the first response 
Test1:{x,y} => second
Test2:{x,y} => Third by the end of processing time
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...