Apache flink - генераторы водяных знаков на основе времени - оптимальная стратегия - PullRequest
0 голосов
/ 30 января 2019

Я новичок на пороге и пытаюсь применить управление окнами.Мой источник - kafka, и моя модель не содержит информацию о времени события, поэтому я стараюсь использовать временные метки Kafka с методом assignTimestampsAndWatermarks ()

Я реализовал два присваивателя временных меток, как показано ниже.

public class TimestampAssigner1 implements AssignerWithPeriodicWatermarks<String> {
    protected Logger          logger = LoggerFactory.getLogger(getClass());

    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp);
        logger.info(String.format("TimestampAssigner1 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element));
        return previousElementTimestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp );
        //Watermark watermarkRes = new Watermark(System.currentTimeMillis() );
        //logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this));
        return watermarkRes;
    }
}


public class TimestampAssigner2 implements AssignerWithPeriodicWatermarks<String> {
    protected Logger          logger = LoggerFactory.getLogger(getClass());

    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        currentMaxTimestamp = Math.max(previousElementTimestamp, currentMaxTimestamp);
        logger.info(String.format("TimestampAssigner2 - currentMaxTimestamp : %s res : %s, element : %s ", currentMaxTimestamp, previousElementTimestamp, element));
        return previousElementTimestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        //Watermark watermarkRes = new Watermark(currentMaxTimestamp );
        Watermark watermarkRes = new Watermark(System.currentTimeMillis() );
        //logger.info(String.format("watermarkRes : %s , this : %s ", watermarkRes, this));
        return watermarkRes;
    }
}

Это то, что я наблюдаю: первый (TimestampAssigner1) не может прогрессировать, если нет новых элементов из источника kafka.Я действительно могу проверить это поведение, элемент получен, но окно не завершается в отсутствие новых элементов.Второй (TimestampAssigner2), кажется, продвигается хорошо, но, насколько я понимаю, поскольку я использую системное время, задержанные элементы не будут обрабатываться, поскольку они не будут включены в окна.

Что должно быть правильным способомсправиться с этой ситуацией?Мое требование - своевременно обрабатывать все события.

С уважением

...