Spark водяных знаков и окон в режиме добавления - PullRequest
0 голосов
/ 23 ноября 2018

Ниже структурированного водяного знака потокового кода и данных окон за 24-часовой интервал в 15-минутных слайдах.Код выдает только пустой пакет 0 в режиме добавления.В режиме обновления результаты отображаются правильно.Режим добавления необходим, поскольку приемник S3 работает только в режиме добавления.

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
        .withWatermark(eventTimeCol, slideDuration)
        .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                col(nameCol)).count();

sliding24h
        .writeStream()
        .format("console")
        .option("truncate", false)
        .option("numRows", 1000)
        .outputMode(OutputMode.Append())
        //.outputMode(OutputMode.Complete())
        .start()
        .awaitTermination();

Ниже приведен полный код теста:

public static void main(String [] args) throws StreamingQueryException {
     SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

     ArrayList<String> rl = new ArrayList<>();
     for (int i = 0; i < 200; ++i) {
         long t = 1512164314L + i * 5 * 60;
         rl.add(t + ",qwer");
     }

     String nameCol = "name";
     String eventTimeCol = "eventTime";
     String eventTimestampCol = "eventTimestamp";

     MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
     input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
     Dataset<Row> stream = input.toDF().selectExpr(
             "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
             "cast(split(value,'[,]')[1] as String) as " + nameCol);

     System.out.println("isStreaming: " +  stream.isStreaming());

     Column eventTime = functions.to_timestamp(col(eventTimestampCol));
     Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

     String windowDuration = "24 hours";
     String slideDuration = "15 minutes";
     Dataset<Row> sliding24h = rowData
             .withWatermark(eventTimeCol, slideDuration)
             .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                     col(nameCol)).count();

     sliding24h
             .writeStream()
             .format("console")
             .option("truncate", false)
             .option("numRows", 1000)
             .outputMode(OutputMode.Append())
             //.outputMode(OutputMode.Complete())
             .start()
             .awaitTermination();
}

1 Ответ

0 голосов
/ 27 ноября 2018

Это ошибка, которая была устранена в 2.4.0 См .: https://issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...