Я новичок на пороге и пытаюсь применить управление окнами.Мой источник - kafka, и моя модель не содержит информацию о времени события, поэтому я стараюсь использовать временные метки Kafka с методом assignTimestampsAndWatermarks ()
Я реализовал два присваивателя временных меток, как показано ниже.
public class TimestampAssigner1 implements AssignerWithPeriodicWatermarks<String> {
protected Logger logger = LoggerFactory.getLogger(getClass());
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private final long maxOutOfOrderness = 3500; // 3.5 seconds
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp);
logger.info(String.format("TimestampAssigner1 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element));
return previousElementTimestamp;
}
@Override
public Watermark getCurrentWatermark() {
Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
//Watermark watermarkRes = new Watermark(currentMaxTimestamp );
//Watermark watermarkRes = new Watermark(System.currentTimeMillis() );
//logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this));
return watermarkRes;
}
}
public class TimestampAssigner2 implements AssignerWithPeriodicWatermarks<String> {
protected Logger logger = LoggerFactory.getLogger(getClass());
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private final long maxOutOfOrderness = 3500; // 3.5 seconds
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp);
logger.info(String.format("TimestampAssigner2 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element));
return previousElementTimestamp;
}
@Override
public Watermark getCurrentWatermark() {
//Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
//Watermark watermarkRes = new Watermark(currentMaxTimestamp );
Watermark watermarkRes = new Watermark(System.currentTimeMillis() );
//logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this));
return watermarkRes;
}
}
Это то, что я наблюдаю: первый (TimestampAssigner1) не может прогрессировать, если нет новых элементов из источника kafka.Я действительно могу проверить это поведение, элемент получен, но окно не завершается в отсутствие новых элементов.Второй (TimestampAssigner2), кажется, продвигается хорошо, но, насколько я понимаю, поскольку я использую системное время, задержанные элементы не будут обрабатываться, поскольку они не будут включены в окна.
Что должно быть правильным способомсправиться с этой ситуацией?Мое требование - своевременно обрабатывать все события.
С уважением