Ниже структурированного водяного знака потокового кода и данных окон за 24-часовой интервал в 15-минутных слайдах.Код выдает только пустой пакет 0 в режиме добавления.В режиме обновления результаты отображаются правильно.Режим добавления необходим, поскольку приемник S3 работает только в режиме добавления.
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
Ниже приведен полный код теста:
public static void main(String [] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
ArrayList<String> rl = new ArrayList<>();
for (int i = 0; i < 200; ++i) {
long t = 1512164314L + i * 5 * 60;
rl.add(t + ",qwer");
}
String nameCol = "name";
String eventTimeCol = "eventTime";
String eventTimestampCol = "eventTimestamp";
MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
"cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
"cast(split(value,'[,]')[1] as String) as " + nameCol);
System.out.println("isStreaming: " + stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
}